You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2008/08/13 00:35:23 UTC
svn commit: r685353 [7/13] - in /hadoop/core/trunk: ./ src/contrib/chukwa/
src/contrib/chukwa/bin/ src/contrib/chukwa/build/ src/contrib/chukwa/conf/
src/contrib/chukwa/dist/ src/contrib/chukwa/docs/
src/contrib/chukwa/docs/paper/ src/contrib/chukwa/ha...
Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/ChukwaArchiveKey.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/ChukwaArchiveKey.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/ChukwaArchiveKey.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/ChukwaArchiveKey.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,286 @@
+// File generated by hadoop record compiler. Do not edit.
+package org.apache.hadoop.chukwa;
+
+public class ChukwaArchiveKey extends org.apache.hadoop.record.Record {
+ private static final org.apache.hadoop.record.meta.RecordTypeInfo _rio_recTypeInfo;
+ private static org.apache.hadoop.record.meta.RecordTypeInfo _rio_rtiFilter;
+ private static int[] _rio_rtiFilterFields;
+ static {
+ _rio_recTypeInfo = new org.apache.hadoop.record.meta.RecordTypeInfo("ChukwaArchiveKey");
+ _rio_recTypeInfo.addField("timePartition", org.apache.hadoop.record.meta.TypeID.LongTypeID);
+ _rio_recTypeInfo.addField("dataType", org.apache.hadoop.record.meta.TypeID.StringTypeID);
+ _rio_recTypeInfo.addField("streamName", org.apache.hadoop.record.meta.TypeID.StringTypeID);
+ _rio_recTypeInfo.addField("seqId", org.apache.hadoop.record.meta.TypeID.LongTypeID);
+ }
+
+ private long timePartition;
+ private String dataType;
+ private String streamName;
+ private long seqId;
+ public ChukwaArchiveKey() { }
+ public ChukwaArchiveKey(
+ final long timePartition,
+ final String dataType,
+ final String streamName,
+ final long seqId) {
+ this.timePartition = timePartition;
+ this.dataType = dataType;
+ this.streamName = streamName;
+ this.seqId = seqId;
+ }
+ public static org.apache.hadoop.record.meta.RecordTypeInfo getTypeInfo() {
+ return _rio_recTypeInfo;
+ }
+ public static void setTypeFilter(org.apache.hadoop.record.meta.RecordTypeInfo rti) {
+ if (null == rti) return;
+ _rio_rtiFilter = rti;
+ _rio_rtiFilterFields = null;
+ }
+ private static void setupRtiFields()
+ {
+ if (null == _rio_rtiFilter) return;
+ // we may already have done this
+ if (null != _rio_rtiFilterFields) return;
+ int _rio_i, _rio_j;
+ _rio_rtiFilterFields = new int [_rio_rtiFilter.getFieldTypeInfos().size()];
+ for (_rio_i=0; _rio_i<_rio_rtiFilterFields.length; _rio_i++) {
+ _rio_rtiFilterFields[_rio_i] = 0;
+ }
+ java.util.Iterator<org.apache.hadoop.record.meta.FieldTypeInfo> _rio_itFilter = _rio_rtiFilter.getFieldTypeInfos().iterator();
+ _rio_i=0;
+ while (_rio_itFilter.hasNext()) {
+ org.apache.hadoop.record.meta.FieldTypeInfo _rio_tInfoFilter = _rio_itFilter.next();
+ java.util.Iterator<org.apache.hadoop.record.meta.FieldTypeInfo> _rio_it = _rio_recTypeInfo.getFieldTypeInfos().iterator();
+ _rio_j=1;
+ while (_rio_it.hasNext()) {
+ org.apache.hadoop.record.meta.FieldTypeInfo _rio_tInfo = _rio_it.next();
+ if (_rio_tInfo.equals(_rio_tInfoFilter)) {
+ _rio_rtiFilterFields[_rio_i] = _rio_j;
+ break;
+ }
+ _rio_j++;
+ }
+ _rio_i++;
+ }
+ }
+ public long getTimePartition() {
+ return timePartition;
+ }
+ public void setTimePartition(final long timePartition) {
+ this.timePartition=timePartition;
+ }
+ public String getDataType() {
+ return dataType;
+ }
+ public void setDataType(final String dataType) {
+ this.dataType=dataType;
+ }
+ public String getStreamName() {
+ return streamName;
+ }
+ public void setStreamName(final String streamName) {
+ this.streamName=streamName;
+ }
+ public long getSeqId() {
+ return seqId;
+ }
+ public void setSeqId(final long seqId) {
+ this.seqId=seqId;
+ }
+ public void serialize(final org.apache.hadoop.record.RecordOutput _rio_a, final String _rio_tag)
+ throws java.io.IOException {
+ _rio_a.startRecord(this,_rio_tag);
+ _rio_a.writeLong(timePartition,"timePartition");
+ _rio_a.writeString(dataType,"dataType");
+ _rio_a.writeString(streamName,"streamName");
+ _rio_a.writeLong(seqId,"seqId");
+ _rio_a.endRecord(this,_rio_tag);
+ }
+ private void deserializeWithoutFilter(final org.apache.hadoop.record.RecordInput _rio_a, final String _rio_tag)
+ throws java.io.IOException {
+ _rio_a.startRecord(_rio_tag);
+ timePartition=_rio_a.readLong("timePartition");
+ dataType=_rio_a.readString("dataType");
+ streamName=_rio_a.readString("streamName");
+ seqId=_rio_a.readLong("seqId");
+ _rio_a.endRecord(_rio_tag);
+ }
+ public void deserialize(final org.apache.hadoop.record.RecordInput _rio_a, final String _rio_tag)
+ throws java.io.IOException {
+ if (null == _rio_rtiFilter) {
+ deserializeWithoutFilter(_rio_a, _rio_tag);
+ return;
+ }
+ // if we're here, we need to read based on version info
+ _rio_a.startRecord(_rio_tag);
+ setupRtiFields();
+ for (int _rio_i=0; _rio_i<_rio_rtiFilter.getFieldTypeInfos().size(); _rio_i++) {
+ if (1 == _rio_rtiFilterFields[_rio_i]) {
+ timePartition=_rio_a.readLong("timePartition");
+ }
+ else if (2 == _rio_rtiFilterFields[_rio_i]) {
+ dataType=_rio_a.readString("dataType");
+ }
+ else if (3 == _rio_rtiFilterFields[_rio_i]) {
+ streamName=_rio_a.readString("streamName");
+ }
+ else if (4 == _rio_rtiFilterFields[_rio_i]) {
+ seqId=_rio_a.readLong("seqId");
+ }
+ else {
+ java.util.ArrayList<org.apache.hadoop.record.meta.FieldTypeInfo> typeInfos = (java.util.ArrayList<org.apache.hadoop.record.meta.FieldTypeInfo>)(_rio_rtiFilter.getFieldTypeInfos());
+ org.apache.hadoop.record.meta.Utils.skip(_rio_a, typeInfos.get(_rio_i).getFieldID(), typeInfos.get(_rio_i).getTypeID());
+ }
+ }
+ _rio_a.endRecord(_rio_tag);
+ }
+ public int compareTo (final Object _rio_peer_) throws ClassCastException {
+ if (!(_rio_peer_ instanceof ChukwaArchiveKey)) {
+ throw new ClassCastException("Comparing different types of records.");
+ }
+ ChukwaArchiveKey _rio_peer = (ChukwaArchiveKey) _rio_peer_;
+ int _rio_ret = 0;
+ _rio_ret = (timePartition == _rio_peer.timePartition)? 0 :((timePartition<_rio_peer.timePartition)?-1:1);
+ if (_rio_ret != 0) return _rio_ret;
+ _rio_ret = dataType.compareTo(_rio_peer.dataType);
+ if (_rio_ret != 0) return _rio_ret;
+ _rio_ret = streamName.compareTo(_rio_peer.streamName);
+ if (_rio_ret != 0) return _rio_ret;
+ _rio_ret = (seqId == _rio_peer.seqId)? 0 :((seqId<_rio_peer.seqId)?-1:1);
+ if (_rio_ret != 0) return _rio_ret;
+ return _rio_ret;
+ }
+ public boolean equals(final Object _rio_peer_) {
+ if (!(_rio_peer_ instanceof ChukwaArchiveKey)) {
+ return false;
+ }
+ if (_rio_peer_ == this) {
+ return true;
+ }
+ ChukwaArchiveKey _rio_peer = (ChukwaArchiveKey) _rio_peer_;
+ boolean _rio_ret = false;
+ _rio_ret = (timePartition==_rio_peer.timePartition);
+ if (!_rio_ret) return _rio_ret;
+ _rio_ret = dataType.equals(_rio_peer.dataType);
+ if (!_rio_ret) return _rio_ret;
+ _rio_ret = streamName.equals(_rio_peer.streamName);
+ if (!_rio_ret) return _rio_ret;
+ _rio_ret = (seqId==_rio_peer.seqId);
+ if (!_rio_ret) return _rio_ret;
+ return _rio_ret;
+ }
+ public Object clone() throws CloneNotSupportedException {
+ ChukwaArchiveKey _rio_other = new ChukwaArchiveKey();
+ _rio_other.timePartition = this.timePartition;
+ _rio_other.dataType = this.dataType;
+ _rio_other.streamName = this.streamName;
+ _rio_other.seqId = this.seqId;
+ return _rio_other;
+ }
+ public int hashCode() {
+ int _rio_result = 17;
+ int _rio_ret;
+ _rio_ret = (int) (timePartition^(timePartition>>>32));
+ _rio_result = 37*_rio_result + _rio_ret;
+ _rio_ret = dataType.hashCode();
+ _rio_result = 37*_rio_result + _rio_ret;
+ _rio_ret = streamName.hashCode();
+ _rio_result = 37*_rio_result + _rio_ret;
+ _rio_ret = (int) (seqId^(seqId>>>32));
+ _rio_result = 37*_rio_result + _rio_ret;
+ return _rio_result;
+ }
+ public static String signature() {
+ return "LChukwaArchiveKey(lssl)";
+ }
+ public static class Comparator extends org.apache.hadoop.record.RecordComparator {
+ public Comparator() {
+ super(ChukwaArchiveKey.class);
+ }
+ static public int slurpRaw(byte[] b, int s, int l) {
+ try {
+ int os = s;
+ {
+ long i = org.apache.hadoop.record.Utils.readVLong(b, s);
+ int z = org.apache.hadoop.record.Utils.getVIntSize(i);
+ s+=z; l-=z;
+ }
+ {
+ int i = org.apache.hadoop.record.Utils.readVInt(b, s);
+ int z = org.apache.hadoop.record.Utils.getVIntSize(i);
+ s+=(z+i); l-= (z+i);
+ }
+ {
+ int i = org.apache.hadoop.record.Utils.readVInt(b, s);
+ int z = org.apache.hadoop.record.Utils.getVIntSize(i);
+ s+=(z+i); l-= (z+i);
+ }
+ {
+ long i = org.apache.hadoop.record.Utils.readVLong(b, s);
+ int z = org.apache.hadoop.record.Utils.getVIntSize(i);
+ s+=z; l-=z;
+ }
+ return (os - s);
+ } catch(java.io.IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ static public int compareRaw(byte[] b1, int s1, int l1,
+ byte[] b2, int s2, int l2) {
+ try {
+ int os1 = s1;
+ {
+ long i1 = org.apache.hadoop.record.Utils.readVLong(b1, s1);
+ long i2 = org.apache.hadoop.record.Utils.readVLong(b2, s2);
+ if (i1 != i2) {
+ return ((i1-i2) < 0) ? -1 : 0;
+ }
+ int z1 = org.apache.hadoop.record.Utils.getVIntSize(i1);
+ int z2 = org.apache.hadoop.record.Utils.getVIntSize(i2);
+ s1+=z1; s2+=z2; l1-=z1; l2-=z2;
+ }
+ {
+ int i1 = org.apache.hadoop.record.Utils.readVInt(b1, s1);
+ int i2 = org.apache.hadoop.record.Utils.readVInt(b2, s2);
+ int z1 = org.apache.hadoop.record.Utils.getVIntSize(i1);
+ int z2 = org.apache.hadoop.record.Utils.getVIntSize(i2);
+ s1+=z1; s2+=z2; l1-=z1; l2-=z2;
+ int r1 = org.apache.hadoop.record.Utils.compareBytes(b1,s1,i1,b2,s2,i2);
+ if (r1 != 0) { return (r1<0)?-1:0; }
+ s1+=i1; s2+=i2; l1-=i1; l1-=i2;
+ }
+ {
+ int i1 = org.apache.hadoop.record.Utils.readVInt(b1, s1);
+ int i2 = org.apache.hadoop.record.Utils.readVInt(b2, s2);
+ int z1 = org.apache.hadoop.record.Utils.getVIntSize(i1);
+ int z2 = org.apache.hadoop.record.Utils.getVIntSize(i2);
+ s1+=z1; s2+=z2; l1-=z1; l2-=z2;
+ int r1 = org.apache.hadoop.record.Utils.compareBytes(b1,s1,i1,b2,s2,i2);
+ if (r1 != 0) { return (r1<0)?-1:0; }
+ s1+=i1; s2+=i2; l1-=i1; l1-=i2;
+ }
+ {
+ long i1 = org.apache.hadoop.record.Utils.readVLong(b1, s1);
+ long i2 = org.apache.hadoop.record.Utils.readVLong(b2, s2);
+ if (i1 != i2) {
+ return ((i1-i2) < 0) ? -1 : 0;
+ }
+ int z1 = org.apache.hadoop.record.Utils.getVIntSize(i1);
+ int z2 = org.apache.hadoop.record.Utils.getVIntSize(i2);
+ s1+=z1; s2+=z2; l1-=z1; l2-=z2;
+ }
+ return (os1 - s1);
+ } catch(java.io.IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ public int compare(byte[] b1, int s1, int l1,
+ byte[] b2, int s2, int l2) {
+ int ret = compareRaw(b1,s1,l1,b2,s2,l2);
+ return (ret == -1)? -1 : ((ret==0)? 1 : 0);}
+ }
+
+ static {
+ org.apache.hadoop.record.RecordComparator.define(ChukwaArchiveKey.class, new Comparator());
+ }
+}
Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/Chunk.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/Chunk.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/Chunk.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/Chunk.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.chukwa.datacollection.adaptor.*;
+
+/**
+ * A chunk is a sequence of bytes at a particular logical offset in a stream,
+ * and containing one or more "records".
+ * Chunks have various metadata, such as source, format,
+ * and pointers to record boundaries within the chunk.
+ *
+ */
+public interface Chunk {
+
+//these conceptually are really network addresses
+ public String getSource();
+ public void setSource(String logSource);
+
+ /**
+ * Get the name of the stream that this Chunk is a chunk of
+ * @return the name of this stream; e.g. file name
+ */
+ public String getStreamName();
+ public void setStreamName(String streamName);
+
+ public String getApplication();
+ public void setApplication(String a);
+
+ //These describe the format of the data buffer
+ public String getDataType();
+ public void setDataType(String t);
+
+ /**
+ * @return the user data in the chunk
+ */
+ public byte[] getData();
+ /**
+ * @param logEvent the user data in the chunk
+ */
+ public void setData(byte[] logEvent);
+
+ /**
+ * get/set the <b>end</b> offsets of records in the buffer.
+ *
+ * We use end, rather than start offsets, since the first start
+ * offset is always 0, but the last end offset specifies how much of the buffer is valid.
+ *
+ * More precisely, offsets[i] is the offset in the Chunk of the last byte of record i
+ * in this chunk.
+ * @return a list of record end offsets
+ */
+ public int[] getRecordOffsets();
+ public void setRecordOffsets(int[] offsets);
+
+ /**
+ * @return the byte offset of the first byte not in this chunk.
+ *
+ * We pick this convention so that subtracting sequence IDs yields length.
+ */
+ public long getSeqID();
+ public void setSeqID(long l);
+
+ /**
+ * Retrieve a reference to the adaptor that sent this event.
+ * Used by LocalAgent and Connectors to deliver acks to the appropriate place.
+ */
+ public Adaptor getInitiator();
+
+ /**
+ * Estimate the size of this Chunk on the wire, assuming each char of metadata takes two bytes
+ * to serialize. This is pessimistic.
+ * @return size in bytes that this Chunk might take once serialized.
+ */
+ public int getSerializedSizeEstimate();
+
+ public void write(DataOutput data) throws IOException;
+}
Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/ChunkBuilder.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/ChunkBuilder.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/ChunkBuilder.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/ChunkBuilder.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa;
+
+import java.util.*;
+
+import org.apache.hadoop.io.DataOutputBuffer;
+import java.io.*;
+
+/**
+ * Right now, just handles record collection.
+ *
+ */
+public class ChunkBuilder {
+
+ ArrayList<Integer> recOffsets = new ArrayList<Integer>();
+ int lastRecOffset = -1;
+ DataOutputBuffer buf = new DataOutputBuffer();
+ /**
+ * Adds the data in rec to an internal buffer; rec can be reused immediately.
+ * @param rec
+ */
+ public void addRecord(byte[] rec) {
+ lastRecOffset = lastRecOffset + rec.length;
+ recOffsets.add(lastRecOffset);
+ try {
+ buf.write(rec);
+ } catch(IOException e) {
+ throw new RuntimeException("buffer write failed. Out of memory?", e);
+ }
+ }
+
+ public Chunk getChunk() {
+ ChunkImpl c = new ChunkImpl();
+ c.setData(buf.getData());
+ c.setSeqID(buf.getLength());
+ int[] offsets = new int[recOffsets.size()];
+ for(int i = 0; i < offsets.length; ++i)
+ offsets[i] = recOffsets.get(i);
+ c.setRecordOffsets(offsets);
+
+ return c;
+ }
+
+
+
+}
Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/ChunkImpl.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/ChunkImpl.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/ChunkImpl.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/ChunkImpl.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+
+import org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor;
+
+public class ChunkImpl implements org.apache.hadoop.io.Writable, Chunk
+{
+
+ private String source = "";
+ private String application = "";
+ private String dataType = "";
+ private byte[] data = null;
+ private int[] recordEndOffsets;
+
+ private String debuggingInfo="";
+
+ private transient Adaptor initiator;
+ long seqID;
+
+ ChunkImpl() {
+ }
+
+ public static ChunkImpl getBlankChunk() {
+ return new ChunkImpl();
+ }
+
+ public ChunkImpl(String dataType, String streamName, long seq, byte[] data, Adaptor source) {
+ this.seqID = seq;
+ this.application = streamName;
+ this.dataType = dataType;
+ this.data = data;
+ this.initiator = source;
+ this.source = localHostAddr;
+ }
+
+ /**
+ * @see org.apache.hadoop.chukwa.Chunk#getData()
+ */
+ public byte[] getData() {
+ return data;
+ }
+
+ /**
+ * @see org.apache.hadoop.chukwa.Chunk#setData(byte[])
+ */
+ public void setData(byte[] logEvent) {
+ this.data = logEvent;
+ }
+
+ /**
+ * @see org.apache.hadoop.chukwa.Chunk#getStreamName()
+ */
+ public String getStreamName() {
+ return application;
+ }
+
+ public void setStreamName(String logApplication) {
+ this.application = logApplication;
+ }
+
+ public String getSource() {
+ return source;
+ }
+
+ public void setSource(String logSource) {
+ this.source = logSource;
+ }
+
+ public String getDebugInfo() {
+ return debuggingInfo;
+ }
+
+ public void setDebugInfo(String a) {
+ this.debuggingInfo = a;
+ }
+
+ /**
+ * @see org.apache.hadoop.chukwa.Chunk#getSeqID()
+ */
+ public long getSeqID() {
+ return seqID;
+ }
+
+ public void setSeqID(long l) {
+ seqID=l;
+ }
+
+ public String getApplication(){
+ return application;
+ }
+
+ public void setApplication(String a){
+ application = a;
+ }
+
+ public Adaptor getInitiator() {
+ return initiator;
+ }
+
+ public void setInitiator(Adaptor a) {
+ initiator = a;
+ }
+
+
+ public void setLogSource() {
+ source = localHostAddr;
+ }
+
+ public int[] getRecordOffsets() {
+
+ if(recordEndOffsets == null)
+ recordEndOffsets = new int[] {data.length -1};
+ return recordEndOffsets;
+ }
+
+ public void setRecordOffsets(int[] offsets) {
+ recordEndOffsets = offsets;
+ }
+
+ public String getDataType() {
+ return dataType;
+ }
+
+ public void setDataType(String t) {
+ dataType = t;
+ }
+
+ /**
+ * @see org.apache.hadoop.io.Writable#readFields(java.io.DataInput)
+ */
+ public void readFields(DataInput in) throws IOException {
+ setSeqID(in.readLong());
+ setSource(in.readUTF());
+ setApplication(in.readUTF());
+ setDataType(in.readUTF());
+ setDebugInfo(in.readUTF());
+
+ int numRecords = in.readInt();
+ recordEndOffsets = new int[numRecords];
+ for(int i=0; i < numRecords; ++i)
+ recordEndOffsets[i] = in.readInt();
+ data = new byte[recordEndOffsets[recordEndOffsets.length -1]+1 ] ;
+ in.readFully(data);
+
+ }
+ /**
+ * @see org.apache.hadoop.io.Writable#write(java.io.DataOutput)
+ */
+ public void write(DataOutput out) throws IOException {
+ out.writeLong(seqID);
+ out.writeUTF(source);
+ out.writeUTF(application);
+ out.writeUTF(dataType);
+ out.writeUTF(debuggingInfo);
+
+ if(recordEndOffsets == null)
+ recordEndOffsets = new int[] {data.length -1};
+
+ out.writeInt(recordEndOffsets.length);
+ for(int i =0; i < recordEndOffsets.length; ++i)
+ out.writeInt(recordEndOffsets[i]);
+
+ out.write(data, 0, recordEndOffsets[recordEndOffsets.length -1] + 1); //byte at last offset is valid
+ }
+
+ public static ChunkImpl read(DataInput in) throws IOException {
+ ChunkImpl w = new ChunkImpl();
+ w.readFields(in);
+ return w;
+ }
+
+ //FIXME: should do something better here, but this is OK for debugging
+ public String toString() {
+ return source+":" + application +":"+ new String(data)+ "/"+seqID;
+ }
+
+ private static String localHostAddr;
+ static
+ {
+ try {
+ localHostAddr = InetAddress.getLocalHost().getHostName();
+ } catch (UnknownHostException e) {
+ localHostAddr = "localhost";
+ }
+ }
+
+ /**
+ * @see org.apache.hadoop.chukwa.Chunk#getSerializedSizeEstimate()
+ */
+ public int getSerializedSizeEstimate() {
+ int size= 2 * (source.length() + application.length() +
+ dataType.length() + debuggingInfo.length()); //length of strings (pessimistic)
+ size += data.length + 4;
+ if(recordEndOffsets == null)
+ size+=8;
+ else
+ size += 4 * (recordEndOffsets.length + 1); //+1 for length of array
+ size += 8; //uuid
+ return size;
+ }
+
+ public void setRecordOffsets(java.util.Collection<Integer> carriageReturns)
+ {
+ recordEndOffsets = new int [carriageReturns.size()];
+ int i = 0;
+ for(Integer offset:carriageReturns )
+ recordEndOffsets[i++] = offset;
+ }
+
+}
Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/conf/ChukwaConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/conf/ChukwaConfiguration.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/conf/ChukwaConfiguration.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/conf/ChukwaConfiguration.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.conf;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+
+public class ChukwaConfiguration extends Configuration {
+ static Logger log = Logger.getLogger(ChukwaConfiguration.class);
+
+ public ChukwaConfiguration() {
+ this(true);
+ }
+
+ public ChukwaConfiguration(boolean loadDefaults) {
+ super();
+ if (loadDefaults) {
+ String chukwaHome = System.getenv("CHUKWA_HOME");
+ if (chukwaHome == null)
+ chukwaHome = ".";
+ log.info("chukwaHome is " + chukwaHome);
+
+ super.addResource(new Path(chukwaHome + "/conf/chukwa-collector-conf.xml"));
+ log.debug("added chukwa-collector-conf.xml to ChukwaConfiguration");
+
+ super.addResource(new Path(chukwaHome + "/conf/chukwa-agent-conf.xml"));
+ log.debug("added chukwa-agent-conf.xml to ChukwaConfiguration");
+ }
+ }
+
+}
Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/ChunkQueue.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/ChunkQueue.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/ChunkQueue.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/ChunkQueue.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.datacollection;
+
+import java.util.List;
+
+import org.apache.hadoop.chukwa.Chunk;
+
+/**
+ * A generic interface for queues of Chunks.
+ *
+ * Differs from a normal queue interface primarily by having collect().
+ */
+public interface ChunkQueue extends ChunkReceiver
+{
+ /**
+ * Add a chunk to the queue, blocking if queue is full.
+ * @param event
+ * @throws InterruptedException if thread is interrupted while blocking
+ */
+ public void add(Chunk event) throws InterruptedException;
+
+ /**
+ * Return at least one, and no more than count, Chunks into events.
+ * Blocks if queue is empty.
+ */
+ public void collect(List<Chunk> events,int count) throws InterruptedException;
+
+ /**
+ * Return an approximation of the number of chunks in the queue currently.
+ * No guarantees are made about the accuracy of this number.
+ */
+ public int size();
+}
Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/ChunkReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/ChunkReceiver.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/ChunkReceiver.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/ChunkReceiver.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,15 @@
+package org.apache.hadoop.chukwa.datacollection;
+
+import org.apache.hadoop.chukwa.Chunk;
+
+public interface ChunkReceiver {
+
+ /**
+ * Add a chunk to the queue, potentially blocking.
+ * @param event
+ * @throws InterruptedException if thread is interrupted while blocking
+ */
+ public void add(Chunk event) throws java.lang.InterruptedException;
+
+
+}
Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/DataFactory.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/DataFactory.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/DataFactory.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/DataFactory.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.datacollection;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.chukwa.datacollection.sender.RetryListOfCollectors;
+
+import org.apache.hadoop.chukwa.datacollection.agent.*;
+import org.apache.log4j.Logger;
+
+public class DataFactory
+{
+ static Logger log = Logger.getLogger(DataFactory.class);
+ static final int QUEUE_SIZE_KB = 10 * 1024;
+ private static DataFactory dataFactory = null;
+ private ChunkQueue chunkQueue = new MemLimitQueue(QUEUE_SIZE_KB * 1024);
+
+ static
+ {
+ dataFactory = new DataFactory();
+ }
+
+ private DataFactory()
+ {}
+
+ public static DataFactory getInstance() {
+ return dataFactory;
+ }
+
+ public ChunkQueue getEventQueue() {
+ return chunkQueue;
+ }
+
+ /**
+ * @return empty list if file does not exist
+ * @throws IOException on other error
+ */
+ public Iterator<String> getCollectors() throws IOException
+ {
+ String chukwaHome = System.getenv("CHUKWA_HOME");
+ if (chukwaHome == null){
+ chukwaHome = ".";
+ }
+ log.info("setting up collectors file: " + chukwaHome + "/conf/collectors");
+ File collectors = new File(chukwaHome + "/conf/collectors");
+ try{
+ return new RetryListOfCollectors(collectors, 1000 * 15);//time is ms between tries
+ } catch(java.io.IOException e) {
+ log.error("failed to read collectors file: ", e);
+ throw e;
+ }
+ }
+}
Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/Adaptor.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/Adaptor.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/Adaptor.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/Adaptor.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.datacollection.adaptor;
+import org.apache.hadoop.chukwa.datacollection.ChunkReceiver;
+
+/**
+ * An adaptor is a component that runs within the Local Agent, producing
+ * chunks of monitoring data.
+ *
+ * An adaptor can, but need not, have an associated thread. If an adaptor
+ * lacks a thread, it needs to arrange some mechanism to periodically get control
+ * and send reports such as a callback somewhere.
+ *
+ * Adaptors must be able to stop and resume without losing data, using
+ * a byte offset in the stream.
+ *
+ * If an adaptor crashes at byte offset n, and is restarted at byte offset k,
+ * with k < n, it is allowed to send different values for bytes k through n the
+ * second time around. However, the stream must still be parseable, assuming that
+ * bytes 0-k come from the first run,and bytes k - n come from the second.
+ */
+public interface Adaptor
+{
+ /**
+ * Start this adaptor
+ * @param type the application type, who is starting this adaptor
+ * @param status the status string to use for configuration.
+ * @param offset the stream offset of the first byte sent by this adaptor
+ * @throws AdaptorException
+ */
+ public void start(String type, String status, long offset, ChunkReceiver dest) throws AdaptorException;
+
+ /**
+ * Return the adaptor's state
+ * Should not include class name, datatype or byte offset, which are written by caller.
+ * @return the adaptor state as a string
+ * @throws AdaptorException
+ */
+ public String getCurrentStatus() throws AdaptorException;
+ public String getType();
+ /**
+ * Signals this adaptor to come to an orderly stop.
+ * The adaptor ought to push out all the data it can
+ * before exiting.
+ *
+ * This method is synchronous:
+ * In other words, after shutdown() returns, no new data should be written.
+ *
+ * @return the logical offset at which the adaptor stops
+ * @throws AdaptorException
+ */
+ public long shutdown() throws AdaptorException;
+
+ /**
+ * Signals this adaptor to come to an abrupt stop, as quickly as it can.
+ * The use case here is "Whups, I didn't mean to start that adaptor tailing
+ * a gigabyte file, stop it now".
+ *
+ * Adaptors might need to do something nontrivial here, e.g., in the case in which
+ * they have registered periodic timer interrupts, or use a shared worker thread
+ * from which they need to disengage.
+ *
+ * This method is synchronous:
+ * In other words, after shutdown() returns, no new data should be written.
+ *
+ * @throws AdaptorException
+ */
+ public void hardStop() throws AdaptorException;
+
+}
\ No newline at end of file
Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/AdaptorException.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/AdaptorException.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/AdaptorException.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/AdaptorException.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.datacollection.adaptor;
+
+public class AdaptorException extends Exception
+{
+
+ private static final long serialVersionUID = -8490279345367308690L;
+
+ public AdaptorException()
+ {
+ super();
+ }
+
+ public AdaptorException(String arg0, Throwable arg1)
+ {
+ super(arg0, arg1);
+ }
+
+ public AdaptorException(String arg0)
+ {
+ super(arg0);
+ }
+
+ public AdaptorException(Throwable arg0)
+ {
+ super(arg0);
+ }
+
+}
Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/ExecAdaptor.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/ExecAdaptor.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/ExecAdaptor.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/ExecAdaptor.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.datacollection.adaptor;
+
+import org.apache.hadoop.chukwa.ChunkImpl;
+import org.apache.hadoop.chukwa.datacollection.ChunkReceiver;
+import org.apache.hadoop.chukwa.inputtools.plugin.ExecPlugin;
+import org.apache.log4j.helpers.ISO8601DateFormat;
+import org.json.JSONException;
+import org.json.JSONObject;
+import java.util.*;
+
+public class ExecAdaptor extends ExecPlugin implements Adaptor {
+
+ static final boolean FAKE_LOG4J_HEADER = true;
+
+ class RunToolTask extends TimerTask {
+ public void run() {
+ JSONObject o = execute();
+ try {
+
+ if(o.getInt("status") == statusKO)
+ hardStop();
+
+ //FIXME: downstream customers would like timestamps here.
+ //Doing that efficiently probably means cutting out all the
+ //excess buffer copies here, and appending into an OutputBuffer.
+ byte[] data;
+ if(FAKE_LOG4J_HEADER) {
+ StringBuilder result = new StringBuilder();
+ ISO8601DateFormat dateFormat = new org.apache.log4j.helpers.ISO8601DateFormat();
+ result.append(dateFormat.format(new java.util.Date()));
+ result.append(" INFO org.apache.hadoop.chukwa.");
+ result.append(type);
+ result.append(": ");
+ result.append(o.getString("stdout"));
+ data = result.toString().getBytes();
+ } else {
+ String stdout = o.getString("stdout");
+ data = stdout.getBytes();
+ }
+
+ ArrayList<Integer> carriageReturns = new ArrayList<Integer>();
+ for(int i = 0; i < data.length ; ++i)
+ if(data[i] == '\n')
+ carriageReturns.add(i);
+
+ sendOffset += data.length;
+ ChunkImpl c = new ChunkImpl(ExecAdaptor.this.type,
+ "results from " + cmd, sendOffset , data, ExecAdaptor.this);
+ c.setRecordOffsets(carriageReturns);
+ dest.add(c);
+ } catch(JSONException e ) {
+ //FIXME: log this somewhere
+ } catch (InterruptedException e) {
+ // TODO Auto-generated catch block
+ }catch(AdaptorException e ) {
+ //FIXME: log this somewhere
+ }
+ }
+ };
+
+ String cmd;
+ String type;
+ ChunkReceiver dest;
+ final java.util.Timer timer;
+ long period = 5 * 1000;
+ volatile long sendOffset = 0;
+
+ public ExecAdaptor() {
+ timer = new java.util.Timer();
+ }
+
+ @Override
+ public String getCurrentStatus() throws AdaptorException {
+ return cmd;
+ }
+
+ @Override
+ public void hardStop() throws AdaptorException {
+ super.stop();
+ timer.cancel();
+ }
+
+ @Override
+ public long shutdown() throws AdaptorException {
+ try {
+ timer.cancel();
+ super.waitFor(); //wait for last data to get pushed out
+ } catch(InterruptedException e) {
+ return sendOffset;
+ }
+ return sendOffset;
+ }
+
+ @Override
+ public void start(String type, String status, long offset, ChunkReceiver dest)
+ throws AdaptorException
+ {
+ cmd = status;
+ this.type = type;
+ this.dest = dest;
+ this.sendOffset = offset;
+
+ TimerTask exec = new RunToolTask();
+ timer.schedule(exec, 0L, period);
+ }
+
+ @Override
+ public String getCmde() {
+ return cmd;
+ }
+
+
+ @Override
+ public String getType() {
+ return type;
+ }
+
+}
Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/CharFileTailingAdaptorUTF8.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/CharFileTailingAdaptorUTF8.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/CharFileTailingAdaptorUTF8.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/CharFileTailingAdaptorUTF8.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.datacollection.adaptor.filetailer;
+
+import org.apache.hadoop.chukwa.ChunkImpl;
+import org.apache.hadoop.chukwa.datacollection.ChunkReceiver;
+import java.util.ArrayList;
+
+/**
+ * A subclass of FileTailingAdaptor that reads UTF8/ascii
+ * files and splits records at carriage returns.
+ *
+ */
+public class CharFileTailingAdaptorUTF8 extends FileTailingAdaptor {
+
+
+
+ private static final char SEPARATOR = '\n';
+
+ private ArrayList<Integer> offsets = new ArrayList<Integer>();
+
+ /**
+ *
+ * Note: this method uses a temporary ArrayList (shared across instances).
+ * This means we're copying ints each time. This could be a performance issue.
+ * Also, 'offsets' never shrinks, and will be of size proportional to the
+ * largest number of lines ever seen in an event.
+ */
+ @Override
+ protected int extractRecords(ChunkReceiver eq, long buffOffsetInFile, byte[] buf)
+ throws InterruptedException
+ {
+ for(int i = 0; i < buf.length; ++i) {
+ if(buf[i] == SEPARATOR) {
+ offsets.add(i);
+ }
+ }
+
+ if(offsets.size() > 0) {
+ int[] offsets_i = new int[offsets.size()];
+ for(int i = 0; i < offsets_i.length ; ++i)
+ offsets_i[i] = offsets.get(i);
+
+ int bytesUsed = offsets_i[offsets_i.length-1] + 1; //char at last offset uses a byte
+ assert bytesUsed > 0: " shouldn't send empty events";
+ ChunkImpl event = new ChunkImpl(type, toWatch.getAbsolutePath(),buffOffsetInFile + bytesUsed, buf, this );
+
+ event.setRecordOffsets(offsets_i);
+ eq.add(event);
+
+ offsets.clear();
+ return bytesUsed;
+ }
+ else
+ return 0;
+
+ }
+
+
+}
Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/CharFileTailingAdaptorUTF8NewLineEscaped.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/CharFileTailingAdaptorUTF8NewLineEscaped.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/CharFileTailingAdaptorUTF8NewLineEscaped.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/CharFileTailingAdaptorUTF8NewLineEscaped.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.datacollection.adaptor.filetailer;
+
+import org.apache.hadoop.chukwa.ChunkImpl;
+import org.apache.hadoop.chukwa.datacollection.ChunkReceiver;
+import org.apache.hadoop.chukwa.util.RecordConstants;
+
+import java.util.ArrayList;
+
+
+/**
+ * A subclass of FileTailingAdaptor that reads UTF8/ascii
+ * files and splits records at non-escaped carriage returns
+ *
+ */
+public class CharFileTailingAdaptorUTF8NewLineEscaped extends FileTailingAdaptor {
+
+
+ private static final char SEPARATOR = '\n';
+
+ private ArrayList<Integer> offsets = new ArrayList<Integer>();
+
+ /**
+ *
+ * Note: this method uses a temporary ArrayList (shared across instances).
+ * This means we're copying ints each time. This could be a performance issue.
+ * Also, 'offsets' never shrinks, and will be of size proportional to the
+ * largest number of lines ever seen in an event.
+ */
+ @Override
+ protected int extractRecords(ChunkReceiver eq, long buffOffsetInFile, byte[] buf)
+ throws InterruptedException
+ {
+ String es = RecordConstants.RECORD_SEPARATOR_ESCAPE_SEQ;
+ for(int i = 0; i < buf.length; ++i) {
+ // if this is a separator
+ if(buf[i] == SEPARATOR){
+ // if possibly preceded by escape sequence (avoid outOfBounds here)
+ boolean escaped = false; // was it escaped?
+ if (i-es.length() >= 0){
+ escaped = true; // maybe (at least there was room for an escape sequence, so let's check for the e.s.)
+ for (int j = 0; j < es.length(); j++){
+ if (buf[i-es.length()+j] != es.charAt(j)){
+ escaped = false;
+ }
+ }
+ }
+ if (!escaped){
+ offsets.add(i);
+ }
+ }
+ }
+
+ if(offsets.size() > 0) {
+ int[] offsets_i = new int[offsets.size()];
+ for(int i = 0; i < offsets_i.length ; ++i)
+ offsets_i[i] = offsets.get(i);
+ //make the stream unique to this adaptor
+ int bytesUsed = offsets_i[offsets_i.length-1] + 1; //char at last offset uses a byte
+ assert bytesUsed > 0: " shouldn't send empty events";
+ ChunkImpl chunk = new ChunkImpl(type, toWatch.getAbsolutePath(),
+ buffOffsetInFile + bytesUsed, buf,this);
+
+ chunk.setSeqID(buffOffsetInFile + bytesUsed);
+ chunk.setRecordOffsets(offsets_i);
+ eq.add(chunk);
+
+ offsets.clear();
+ return bytesUsed;
+ }
+ else
+ return 0;
+ }
+
+ public String toString() {
+ return "escaped newline CFTA-UTF8";
+ }
+
+
+}
Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailer.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailer.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailer.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailer.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.datacollection.adaptor.filetailer;
+
+import java.util.*;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+import org.apache.hadoop.chukwa.datacollection.DataFactory;
+import org.apache.hadoop.chukwa.datacollection.ChunkQueue;
+
+/**
+ * A shared thread used by all FileTailingAdaptors.
+ *
+ * For now, it tries each file in succession. If it gets through every
+ * file within two seconds, and no more data remains, it will sleep.
+ *
+ * If there was still data available in any file, the adaptor will loop again.
+ *
+ */
+class FileTailer extends Thread {
+ private List<FileTailingAdaptor> adaptors;
+ ChunkQueue eq; //not private -- useful for file tailing adaptor classes
+
+ /**
+ * How often to tail each file.
+ */
+ int SAMPLE_PERIOD_MS = 1000* 2; //FIXME: should be configurable
+
+ FileTailer() {
+ eq = DataFactory.getInstance().getEventQueue();
+
+ //iterations are much more common than adding a new adaptor
+ adaptors = new CopyOnWriteArrayList<FileTailingAdaptor>();
+
+ this.setDaemon(true);
+ start();//start the file-tailing thread
+ }
+
+ //called by FileTailingAdaptor, only
+ void startWatchingFile(FileTailingAdaptor f) {
+ adaptors.add(f);
+ }
+
+ //called by FileTailingAdaptor, only
+ void stopWatchingFile(FileTailingAdaptor f) {
+ adaptors.remove(f);
+ }
+
+ public void run() {
+ try{
+ while(true) {
+ boolean shouldISleep = true;
+ long startTime = System.currentTimeMillis();
+ for(FileTailingAdaptor f: adaptors) {
+ boolean hasMoreData = f.tailFile(eq);
+ shouldISleep &= !hasMoreData;
+ }
+ long timeToReadFiles = System.currentTimeMillis() - startTime;
+ assert timeToReadFiles >= 0 : " time shouldn't go backwards";
+ if(timeToReadFiles < SAMPLE_PERIOD_MS && shouldISleep)
+ Thread.sleep(SAMPLE_PERIOD_MS - timeToReadFiles+1);
+ }
+ }
+ catch(InterruptedException e)
+ {}
+ }
+
+
+}
Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.datacollection.adaptor.filetailer;
+
+import org.apache.hadoop.chukwa.ChunkImpl;
+import org.apache.hadoop.chukwa.datacollection.ChunkReceiver;
+import org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor;
+import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorException;
+import org.apache.log4j.Logger;
+
+import java.io.*;
+
+/**
+ * An adaptor that repeatedly tails a specified file, sending the new bytes.
+ * This class does not split out records, but just sends everything up to end of file.
+ * Subclasses can alter this behavior by overriding extractRecords().
+ *
+ */
+public class FileTailingAdaptor implements Adaptor
+{
+
+ static Logger log;
+
+ /**
+ * This is the maximum amount we'll read from any one file before moving on
+ * to the next. This way, we get quick response time for other files if one
+ * file is growing rapidly.
+ */
+ public static final int MAX_READ_SIZE = 128 * 1024;
+
+ File toWatch;
+ /**
+ * next PHYSICAL offset to read
+ */
+ protected long fileReadOffset;
+ protected String type;
+ private ChunkReceiver dest;
+
+ /**
+ * The logical offset of the first byte of the file
+ */
+ private long offsetOfFirstByte = 0;
+
+ private static FileTailer tailer;
+
+ static {
+ tailer = new FileTailer();
+ log =Logger.getLogger(FileTailingAdaptor.class);
+ }
+
+ public void start(String type, String params, long bytes, ChunkReceiver dest) {
+ //in this case params = filename
+ log.info("started file tailer on file " + params);
+ this.type = type;
+ this.dest = dest;
+
+ String[] words = params.split(" ");
+ if(words.length > 1) {
+ offsetOfFirstByte = Long.parseLong(words[0]);
+ toWatch = new File(params.substring(words[0].length() + 1));
+ }
+ else
+ toWatch = new File(params);
+
+ this.fileReadOffset= bytes - offsetOfFirstByte;
+ tailer.startWatchingFile(this);
+ }
+
+ /**
+ * Do one last tail, and then stop
+ * @see org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor#shutdown()
+ */
+ public long shutdown() throws AdaptorException {
+ try{
+ tailFile(tailer.eq); // get tail end of file.
+ } catch(InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ hardStop();//need to do this afterwards, so that offset stays visible during tailFile().
+ return fileReadOffset + offsetOfFirstByte;
+ }
+ /**
+ * Stop tailing the file, effective immediately.
+ */
+ public void hardStop() throws AdaptorException {
+ tailer.stopWatchingFile(this);
+ }
+
+ /**
+ * @see org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor#getCurrentStatus()
+ */
+ public String getCurrentStatus() {
+ return type + " " + offsetOfFirstByte+ " " + toWatch.getPath();
+ // can make this more efficient using a StringBuilder
+ }
+
+ public String toString() {
+ return "Tailer on " + toWatch;
+ }
+
+ /**
+ * Looks at the tail of the associated file, adds some of it to event queue
+ * This method is not thread safe. Returns true if there's more data in the
+ * file
+ *
+ * @param eq the queue to write Chunks into
+ */
+ public synchronized boolean tailFile(ChunkReceiver eq) throws InterruptedException {
+ boolean hasMoreData = false;
+ try {
+ if(!toWatch.exists())
+ return false; //no more data
+
+ RandomAccessFile reader = new RandomAccessFile(toWatch, "r");
+ long len = reader.length();
+ if (len > fileReadOffset) {
+ reader.seek(fileReadOffset);
+
+ long bufSize = len - fileReadOffset;
+ if (bufSize > MAX_READ_SIZE) {
+ bufSize = MAX_READ_SIZE;
+ hasMoreData = true;
+ }
+ byte[] buf = new byte[(int) bufSize];
+ reader.read(buf);
+ assert reader.getFilePointer() == fileReadOffset + bufSize : " event size arithmetic is broken: "
+ + " pointer is "
+ + reader.getFilePointer()
+ + " but offset is " + fileReadOffset + bufSize;
+
+ int bytesUsed = extractRecords(dest, fileReadOffset + offsetOfFirstByte, buf);
+ fileReadOffset = fileReadOffset + bytesUsed;
+ }
+ reader.close();
+ } catch (IOException e) {
+ log.warn("failure reading " + toWatch, e);
+ }
+ return hasMoreData;
+ }
+
+ /**
+ * Extract records from a byte sequence
+ * @param eq the queue to stick the new chunk[s] in
+ * @param buffOffsetInFile the byte offset in the stream at which buf[] begins
+ * @param buf the byte buffer to extract records from
+ * @return the number of bytes processed
+ * @throws InterruptedException
+ */
+ protected int extractRecords(ChunkReceiver eq, long buffOffsetInFile, byte[] buf)
+ throws InterruptedException
+ {
+ ChunkImpl chunk = new ChunkImpl(type, toWatch.getAbsolutePath(), buffOffsetInFile + buf.length,
+ buf, this);
+
+ eq.add(chunk);
+ return buf.length;
+ }
+
+ @Override
+ public String getType() {
+ return type;
+ }
+
+
+}
Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/agent/AdaptorFactory.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/agent/AdaptorFactory.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/agent/AdaptorFactory.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/agent/AdaptorFactory.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.datacollection.agent;
+
+import org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor;
+
+/**
+ * Produces new unconfigured adaptors, given the class name of the appender type
+ *
+ */
+public class AdaptorFactory {
+
+ /**
+ * Instantiate an adaptor that can be added by the {@link ChukwaAgent}
+ * @param className the name of the {@link Adaptor} class to instantiate
+ * @return an Adaptor of the specified type
+ */
+ static Adaptor createAdaptor(String className){
+ Object obj = null;
+ try{
+ //the following reflection business for type checking is probably unnecessary
+ //since it will just throw a ClassCastException on error anyway.
+ obj = Class.forName(className).newInstance();
+ if (Adaptor.class.isInstance(obj)){
+ return (Adaptor) obj;
+ }
+ else return null;
+ } catch (Exception e){
+ System.out.println("Error instantiating new adaptor by class" + e);
+ return null;
+ }
+
+ }
+
+}
Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/agent/AgentControlSocketListener.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/agent/AgentControlSocketListener.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/agent/AgentControlSocketListener.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/agent/AgentControlSocketListener.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.datacollection.agent;
+import java.net.*;
+import java.io.*;
+
+import org.apache.hadoop.chukwa.datacollection.adaptor.*;
+import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
+import org.apache.log4j.Logger;
+import java.util.Map;
+
+/**
+ * Class to handle the agent control protocol.
+ * This is a simple line-oriented ASCII protocol, that is designed
+ * to be easy to work with both programmatically and via telnet.
+ *
+ * The port to bind to can be specified by setting option
+ * chukwaAgent.agent.control.port
+ */
+public class AgentControlSocketListener extends Thread {
+
+
+ static Logger log= Logger.getLogger(AgentControlSocketListener.class);
+
+ ChukwaAgent agent;
+ int portno;
+ ServerSocket s= null;
+ boolean closing = false;
+
+ private class ListenThread extends Thread
+ {
+ Socket connection;
+ ListenThread(Socket conn) {
+ connection = conn;
+ this.setName("listen thread for " + connection.getRemoteSocketAddress());
+ }
+
+ public void run() {
+ try {
+ InputStream in = connection.getInputStream();
+ BufferedReader br = new BufferedReader(new InputStreamReader(in));
+ PrintStream out = new PrintStream(new BufferedOutputStream(connection.getOutputStream()));
+ String cmd = null;
+ while((cmd = br.readLine()) != null) {
+ processCommand(cmd, out);
+ }
+ log.info("control connection closed");
+ }
+ catch(SocketException e ) {
+ if(e.getMessage().equals("Socket Closed"))
+ log.info("control socket closed");
+ } catch(IOException e) {
+ log.warn("a control connection broke", e);
+ }
+ }
+
+ /**
+ * process a protocol command
+ * @param cmd the command given by the user
+ * @param out a PrintStream writing to the socket
+ * @throws IOException
+ */
+ public void processCommand(String cmd, PrintStream out) throws IOException {
+ String[] words = cmd.split(" ");
+ log.info("command from " + connection.getRemoteSocketAddress() + ":"+ cmd);
+
+ if(words[0].equalsIgnoreCase("help")) {
+ out.println("you're talking to the Chukwa agent. Commands available: ");
+ out.println("add [adaptorname] [args] [offset] -- start an adaptor");
+ out.println("shutdown [adaptornumber] -- graceful stop");
+ out.println("stop [adaptornumber] -- abrupt stop");
+ out.println("list -- list running adaptors");
+ out.println("close -- close this connection");
+ out.println("stopagent -- stop the whole agent process");
+ out.println("help -- print this message");
+ out.println("\t Command names are case-blind.");
+ }
+ else if(words[0].equalsIgnoreCase("close")) {
+ connection.close();
+ }
+ else if(words[0].equalsIgnoreCase("add")) {
+ long newID = agent.processCommand(cmd);
+ if(newID != -1)
+ out.println("OK add completed; new ID is " +newID);
+ else
+ out.println("failed to start adaptor...check logs for details");
+ }
+ else if(words[0].equalsIgnoreCase("shutdown")) {
+ if(words.length < 2) {
+ out.println("need to specify an adaptor to shut down, by number");
+ }
+ else {
+ long num = Long.parseLong(words[1]);
+ long offset = agent.stopAdaptor(num, true);
+ if(offset != -1)
+ out.println("OK adaptor "+ num+ " stopping gracefully at " + offset);
+ else
+ out.println("FAIL: perhaps adaptor " + num + " does not exist");
+ }
+ }
+ else if(words[0].equalsIgnoreCase("stop")) {
+ if(words.length < 2) {
+ out.println("need to specify an adaptor to shut down, by number");
+ } else {
+ long num = Long.parseLong(words[1]);
+ agent.stopAdaptor(num, false);
+ out.println("OK adaptor "+ num+ " stopped");
+ }
+ }
+ else if(words[0].equalsIgnoreCase("list") ) {
+ Map<Long, Adaptor> adaptorsByNumber = agent.getAdaptorList();
+ System.out.println("number of adaptors: " + adaptorsByNumber.size());
+ synchronized(adaptorsByNumber) {
+ for(Map.Entry<Long, Adaptor> a: adaptorsByNumber.entrySet()) {
+ try{
+ out.print(a.getKey());
+ out.print(") ");
+ out.print(" ");
+ out.println(formatAdaptorStatus(a.getValue()));
+ } catch(AdaptorException e) {
+ log.error(e);
+ }
+ }
+ out.println("");
+ }
+ } else if(words[0].equalsIgnoreCase("stopagent")) {
+ out.println("stopping agent process.");
+ connection.close();
+ agent.shutdown(true);
+ }
+ else {
+ log.warn("unknown command " + words[0]);
+ out.println("unknown command" + words[0]);
+ out.println("say 'help' for a list of legal commands");
+ }
+ out.flush();
+ }
+
+ }
+ /**
+ * Initializes listener, but does not bind to socket.
+ * @param a the agent to control
+ */
+ public AgentControlSocketListener(ChukwaAgent a)
+ {
+ ChukwaConfiguration conf = new ChukwaConfiguration();
+ this.setDaemon(false); //to keep the local agent alive
+ agent = a;
+ portno = conf.getInt("chukwaAgent.agent.control.port", 9093);
+ log.info("AgentControlSocketListerner port set to " + portno);
+ this.setName("control socket listener");
+ }
+
+ public String formatAdaptorStatus(Adaptor a) throws AdaptorException {
+ return a.getClass().getCanonicalName() + " " + a.getCurrentStatus() + " " + agent.getOffset(a);
+ }
+
+ /**
+ * Binds to socket, starts looping listening for commands
+ */
+ public void run() {
+ try {
+ if(!isBound())
+ tryToBind();
+ } catch(IOException e) {
+ return;
+ }
+
+ while(!closing)
+ {
+ try {
+ Socket connection = s.accept();
+ log.info("new connection from " + connection.getInetAddress());
+ ListenThread l = new ListenThread(connection);
+ l.setDaemon(true);
+ l.start();
+ } catch(IOException e) {
+ if(!closing)
+ log.warn("control socket error: ",e );
+ else {
+ log.info("shutting down listen thread due to shutdown() call");
+ break;
+ }
+ }
+ }//end while
+ }
+ /**
+ * Close the control socket, and exit. Triggers graceful thread shutdown.
+ */
+ public void shutdown() {
+ closing = true;
+ try{
+ if(s != null)
+ s.close();
+ s = null;
+ }
+ catch(IOException e)
+ {} //ignore exception on close
+ }
+
+ public boolean isBound() {
+ return s!= null && s.isBound();
+ }
+
+ public void tryToBind() throws IOException
+ {
+ s= new ServerSocket(portno);
+ if(s.isBound())
+ log.debug("socket bound to " + portno);
+ else
+ log.debug("socket isn't bound");
+
+ }
+
+}