You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cd...@apache.org on 2008/12/05 21:30:21 UTC
svn commit: r723855 [9/23] - in /hadoop/core/trunk: ./ src/contrib/
src/contrib/chukwa/ src/contrib/chukwa/bin/ src/contrib/chukwa/conf/
src/contrib/chukwa/docs/ src/contrib/chukwa/docs/paper/
src/contrib/chukwa/hadoop-packaging/ src/contrib/chukwa/lib...
Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/MRJobReduceProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/MRJobReduceProcessor.java?rev=723855&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/MRJobReduceProcessor.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/MRJobReduceProcessor.java Fri Dec 5 12:30:14 2008
@@ -0,0 +1,81 @@
+package org.apache.hadoop.chukwa.extraction.demux.processor.reducer;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
+import org.apache.hadoop.chukwa.extraction.engine.Record;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.log4j.Logger;
+
+public class MRJobReduceProcessor implements ReduceProcessor
+{
+ static Logger log = Logger.getLogger(MRJobReduceProcessor.class);
+ @Override
+ public String getDataType()
+ {
+ return MRJobReduceProcessor.class.getName();
+ }
+
+ @Override
+ public void process(ChukwaRecordKey key, Iterator<ChukwaRecord> values,
+ OutputCollector<ChukwaRecordKey, ChukwaRecord> output,
+ Reporter reporter)
+ {
+ try
+ {
+ HashMap<String, String> data = new HashMap<String, String>();
+
+ ChukwaRecord record = null;
+ String[] fields = null;
+ while(values.hasNext())
+ {
+ record = values.next();
+ fields = record.getFields();
+ for(String field: fields)
+ {
+ data.put(field, record.getValue(field));
+ }
+ }
+
+ //Extract initial time: SUBMIT_TIME
+ long initTime = Long.parseLong(data.get("SUBMIT_TIME"));
+
+ // Extract HodId
+ // maybe use a regex to extract this and load it from configuration
+ // JOBCONF="/user/xxx/mapredsystem/563976.xxx.yyy.com/job_200809062051_0001/job.xml"
+ String jobConf = data.get("JOBCONF");
+ int idx = jobConf.indexOf("mapredsystem/");
+ idx += 13;
+ int idx2 = jobConf.indexOf(".", idx);
+ data.put("HodId", jobConf.substring(idx, idx2));
+
+ ChukwaRecordKey newKey = new ChukwaRecordKey();
+ newKey.setKey(""+initTime);
+ newKey.setReduceType("MRJob");
+
+ ChukwaRecord newRecord = new ChukwaRecord();
+ newRecord.add(Record.tagsField, record.getValue(Record.tagsField));
+ newRecord.setTime(initTime);
+ newRecord.add(Record.tagsField, record.getValue(Record.tagsField));
+ Iterator<String> it = data.keySet().iterator();
+ while(it.hasNext())
+ {
+ String field = it.next();
+ newRecord.add(field, data.get(field));
+ }
+
+ output.collect(newKey, newRecord);
+ }
+ catch (IOException e)
+ {
+ log.warn("Unable to collect output in JobLogHistoryReduceProcessor [" + key + "]", e);
+ e.printStackTrace();
+ }
+
+ }
+
+}
Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/ReduceProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/ReduceProcessor.java?rev=723855&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/ReduceProcessor.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/ReduceProcessor.java Fri Dec 5 12:30:14 2008
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.extraction.demux.processor.reducer;
+
+import java.util.Iterator;
+
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+
+public interface ReduceProcessor
+{
+ public String getDataType();
+ public void process(ChukwaRecordKey key,Iterator<ChukwaRecord> values,
+ OutputCollector<ChukwaRecordKey,
+ ChukwaRecord> output, Reporter reporter);
+}
Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/ReduceProcessorFactory.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/ReduceProcessorFactory.java?rev=723855&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/ReduceProcessorFactory.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/ReduceProcessorFactory.java Fri Dec 5 12:30:14 2008
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.extraction.demux.processor.reducer;
+
+import java.util.HashMap;
+
+import org.apache.log4j.Logger;
+
+
+
+public class ReduceProcessorFactory
+{
+ static Logger log = Logger.getLogger(ReduceProcessorFactory.class);
+
+ // TODO
+ // add new mapper package at the end.
+ // We should have a more generic way to do this.
+ // Ex: read from config
+ // list of alias
+ // and
+ // alias -> processor class
+
+ // ******** WARNING ********
+ // If the ReduceProcessor is not there use Identity instead
+
+
+ private static HashMap<String,ReduceProcessor > processors =
+ new HashMap<String, ReduceProcessor>(); // registry
+
+ private ReduceProcessorFactory()
+ {}
+
+ public static ReduceProcessor getProcessor(String reduceType)
+ throws UnknownReduceTypeException
+ {
+ String path = "org.apache.hadoop.chukwa.extraction.demux.processor.reducer."+reduceType;
+ if (processors.containsKey(reduceType)) {
+ return processors.get(reduceType);
+ } else {
+ ReduceProcessor processor = null;
+ try {
+ processor = (ReduceProcessor)Class.forName(path).getConstructor().newInstance();
+ }
+ catch(ClassNotFoundException e)
+ {
+ // ******** WARNING ********
+ // If the ReduceProcessor is not there use Identity instead
+ processor = getProcessor("IdentityReducer");
+ register(reduceType,processor);
+ return processor;
+ }
+ catch(Exception e) {
+ throw new UnknownReduceTypeException("error constructing processor", e);
+ }
+
+ //TODO using a ThreadSafe/reuse flag to actually decide if we want
+ // to reuse the same processor again and again
+ register(reduceType,processor);
+ return processor;
+ }
+ }
+
+ /** Register a specific parser for a {@link ReduceProcessor}
+ * implementation. */
+ public static synchronized void register(String reduceType,
+ ReduceProcessor processor)
+ {
+ log.info("register " + processor.getClass().getName() + " for this recordType :" + reduceType);
+ if (processors.containsKey(reduceType))
+ {
+ throw new DuplicateReduceProcessorException("Duplicate processor for recordType:" + reduceType);
+ }
+ ReduceProcessorFactory.processors.put(reduceType, processor);
+ }
+
+}
Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/SystemMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/SystemMetrics.java?rev=723855&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/SystemMetrics.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/SystemMetrics.java Fri Dec 5 12:30:14 2008
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.extraction.demux.processor.reducer;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.log4j.Logger;
+
+public class SystemMetrics implements ReduceProcessor {
+ static Logger log = Logger.getLogger(SystemMetrics.class);
+ @Override
+ public String getDataType() {
+ return this.getClass().getName();
+ }
+
+ @Override
+ public void process(ChukwaRecordKey key,
+ Iterator<ChukwaRecord> values,
+ OutputCollector<ChukwaRecordKey, ChukwaRecord> output,
+ Reporter reporter) {
+ try {
+
+ ChukwaRecord record = null;
+ ChukwaRecord newRecord = new ChukwaRecord();
+
+ while(values.hasNext()) {
+ record = values.next();
+ newRecord.setTime(record.getTime());
+
+ if(record.containsField("IFACE")) {
+ if(record.containsField("rxpck/s")) {
+ if (record.containsField("rxbyt/s") && record.containsField("txbyt/s")) {
+ double netBusyPcnt=0, netRxByts=0, netTxByts=0, netSpeed=128000000.00;
+ netRxByts=Double.parseDouble(record.getValue("rxbyt/s"));
+ netTxByts=Double.parseDouble(record.getValue("txbyt/s"));
+ netBusyPcnt=(netRxByts/netSpeed*100)+(netTxByts/netSpeed*100);
+ record.add(record.getValue("IFACE")+"_busy_pcnt", "" + netBusyPcnt);
+ record.add("csource", record.getValue("csource"));
+ }
+ record.add(record.getValue("IFACE")+".rxbyt/s", record.getValue("rxbyt/s"));
+ record.add(record.getValue("IFACE")+".rxpck/s", record.getValue("rxpck/s"));
+ record.add(record.getValue("IFACE")+".txbyt/s", record.getValue("txbyt/s"));
+ record.add(record.getValue("IFACE")+".txpck/s", record.getValue("txpck/s"));
+ record.removeValue("rxbyt/s");
+ record.removeValue("rxpck/s");
+ record.removeValue("txbyt/s");
+ record.removeValue("txpck/s");
+ }
+ if(record.containsField("rxerr/s")) {
+ record.add(record.getValue("IFACE")+".rxerr/s", record.getValue("rxerr/s"));
+ record.add(record.getValue("IFACE")+".rxdrop/s", record.getValue("rxdrop/s"));
+ record.add(record.getValue("IFACE")+".txerr/s", record.getValue("txerr/s"));
+ record.add(record.getValue("IFACE")+".txdrop/s", record.getValue("txdrop/s"));
+ record.removeValue("rxerr/s");
+ record.removeValue("rxdrop/s");
+ record.removeValue("txerr/s");
+ record.removeValue("txdrop/s");
+ }
+ record.removeValue("IFACE");
+ }
+
+ if(record.containsField("Device:")) {
+ record.add(record.getValue("Device:")+".rkB/s", record.getValue("rkB/s"));
+ record.add(record.getValue("Device:")+".wkB/s", record.getValue("wkB/s"));
+ record.add(record.getValue("Device:")+".%util", record.getValue("%util"));
+ record.removeValue("rkB/s");
+ record.removeValue("wkB/s");
+ record.removeValue("%util");
+ record.removeValue("Device:");
+ }
+
+ if (record.containsField("swap_free")) {
+ float swapUsedPcnt=0, swapUsed=0, swapTotal=0;
+ swapUsed=Long.parseLong(record.getValue("swap_used"));
+ swapTotal=Long.parseLong(record.getValue("swap_total"));
+ swapUsedPcnt=swapUsed/swapTotal*100;
+ record.add("swap_used_pcnt", "" + swapUsedPcnt);
+ record.add("csource", record.getValue("csource"));
+ }
+
+ if (record.containsField("mem_used")) {
+ double memUsedPcnt=0, memTotal=0, memUsed=0;
+ memTotal=Double.parseDouble(record.getValue("mem_total"));
+ memUsed=Double.parseDouble(record.getValue("mem_used"));
+ memUsedPcnt=memUsed/memTotal*100;
+ record.add("mem_used_pcnt", "" + memUsedPcnt);
+ record.add("csource", record.getValue("csource"));
+ }
+
+ if (record.containsField("mem_buffers")) {
+ double memBuffersPcnt=0, memTotal=0, memBuffers=0;
+ memTotal=Double.parseDouble(record.getValue("mem_total"));
+ memBuffers=Double.parseDouble(record.getValue("mem_buffers"));
+ memBuffersPcnt=memBuffers/memTotal*100;
+ record.add("mem_buffers_pcnt", "" + memBuffersPcnt);
+ record.add("csource", record.getValue("csource"));
+ }
+
+ // Copy over all fields
+ String[] fields = record.getFields();
+ for(String f: fields){
+ newRecord.add(f, record.getValue(f));
+ }
+ }
+ record.add("capp", "systemMetrics");
+ output.collect(key, newRecord);
+ } catch (IOException e) {
+ log.warn("Unable to collect output in SystemMetricsReduceProcessor [" + key + "]", e);
+ e.printStackTrace();
+ }
+
+ }
+}
Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/UnknownReduceTypeException.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/UnknownReduceTypeException.java?rev=723855&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/UnknownReduceTypeException.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/UnknownReduceTypeException.java Fri Dec 5 12:30:14 2008
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.extraction.demux.processor.reducer;
+
+public class UnknownReduceTypeException extends Exception
+{
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 5760553864088487836L;
+
+ public UnknownReduceTypeException()
+ {
+ }
+
+ public UnknownReduceTypeException(String message)
+ {
+ super(message);
+ }
+
+ public UnknownReduceTypeException(Throwable cause)
+ {
+ super(cause);
+ }
+
+ public UnknownReduceTypeException(String message, Throwable cause)
+ {
+ super(message, cause);
+ }
+
+}
Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaRecord.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaRecord.java?rev=723855&r1=723854&r2=723855&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaRecord.java (original)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaRecord.java Fri Dec 5 12:30:14 2008
@@ -67,6 +67,12 @@
return this.mapFields.containsKey(field);
}
+ public void removeValue(String field) {
+ if(this.mapFields.containsKey(field)) {
+ this.mapFields.remove(field);
+ }
+ }
+
@Override
public String toString()
{
@@ -76,36 +82,37 @@
Map.Entry<String,Buffer> entry = null;
StringBuilder sb = new StringBuilder();
sb.append("<event ");
- String body = null;
+ StringBuilder body = new StringBuilder();
+
String key = null;
String val = null;
-
+ boolean hasBody = false;
+ String bodyVal = null;
while (it.hasNext())
{
entry = it.next();
key = entry.getKey().intern();
val = new String(entry.getValue().get());
- if (key == Record.rawField.intern())
+ if (key.intern() == Record.bodyField.intern())
{
- continue;
- }
-
- if (key == Record.bodyField.intern())
- {
- body = val;
+ hasBody = true;
+ bodyVal = val;
}
else
{
- sb.append(entry.getKey()).append("=\"").append(val).append("\" ");
+ sb.append(key).append("=\"").append(val).append("\" ");
+ body.append(key).append( " = ").append(val).append("<br>");
}
+
+
}
- sb.append(">").append(body);
+ if (hasBody)
+ { sb.append(">").append(bodyVal);}
+ else
+ { sb.append(">").append(body);}
sb.append("</event>");
return sb.toString();
-// //<event start="Jun 15 2008 00:00:00" end="Jun 15 2008 12:00:00" title="hello" link="/here">body</event>
-// return "<event start=\"" + formatter.format(new Date(this.getTime())) + "\" title=\""
-// + this.getValue(Record.sourceField) + "\" >" + this.getValue(Record.bodyField) + "</event>" ;
}
Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaRecordKey.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaRecordKey.java?rev=723855&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaRecordKey.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaRecordKey.java Fri Dec 5 12:30:14 2008
@@ -0,0 +1,212 @@
+// File generated by hadoop record compiler. Do not edit.
+package org.apache.hadoop.chukwa.extraction.engine;
+
+public class ChukwaRecordKey extends org.apache.hadoop.record.Record {
+ private static final org.apache.hadoop.record.meta.RecordTypeInfo _rio_recTypeInfo;
+ private static org.apache.hadoop.record.meta.RecordTypeInfo _rio_rtiFilter;
+ private static int[] _rio_rtiFilterFields;
+ static {
+ _rio_recTypeInfo = new org.apache.hadoop.record.meta.RecordTypeInfo("ChukwaRecordKey");
+ _rio_recTypeInfo.addField("reduceType", org.apache.hadoop.record.meta.TypeID.StringTypeID);
+ _rio_recTypeInfo.addField("key", org.apache.hadoop.record.meta.TypeID.StringTypeID);
+ }
+
+ private String reduceType;
+ private String key;
+ public ChukwaRecordKey() { }
+ public ChukwaRecordKey(
+ final String reduceType,
+ final String key) {
+ this.reduceType = reduceType;
+ this.key = key;
+ }
+ public static org.apache.hadoop.record.meta.RecordTypeInfo getTypeInfo() {
+ return _rio_recTypeInfo;
+ }
+ public static void setTypeFilter(org.apache.hadoop.record.meta.RecordTypeInfo rti) {
+ if (null == rti) return;
+ _rio_rtiFilter = rti;
+ _rio_rtiFilterFields = null;
+ }
+ private static void setupRtiFields()
+ {
+ if (null == _rio_rtiFilter) return;
+ // we may already have done this
+ if (null != _rio_rtiFilterFields) return;
+ int _rio_i, _rio_j;
+ _rio_rtiFilterFields = new int [_rio_rtiFilter.getFieldTypeInfos().size()];
+ for (_rio_i=0; _rio_i<_rio_rtiFilterFields.length; _rio_i++) {
+ _rio_rtiFilterFields[_rio_i] = 0;
+ }
+ java.util.Iterator<org.apache.hadoop.record.meta.FieldTypeInfo> _rio_itFilter = _rio_rtiFilter.getFieldTypeInfos().iterator();
+ _rio_i=0;
+ while (_rio_itFilter.hasNext()) {
+ org.apache.hadoop.record.meta.FieldTypeInfo _rio_tInfoFilter = _rio_itFilter.next();
+ java.util.Iterator<org.apache.hadoop.record.meta.FieldTypeInfo> _rio_it = _rio_recTypeInfo.getFieldTypeInfos().iterator();
+ _rio_j=1;
+ while (_rio_it.hasNext()) {
+ org.apache.hadoop.record.meta.FieldTypeInfo _rio_tInfo = _rio_it.next();
+ if (_rio_tInfo.equals(_rio_tInfoFilter)) {
+ _rio_rtiFilterFields[_rio_i] = _rio_j;
+ break;
+ }
+ _rio_j++;
+ }
+ _rio_i++;
+ }
+ }
+ public String getReduceType() {
+ return reduceType;
+ }
+ public void setReduceType(final String reduceType) {
+ this.reduceType=reduceType;
+ }
+ public String getKey() {
+ return key;
+ }
+ public void setKey(final String key) {
+ this.key=key;
+ }
+ public void serialize(final org.apache.hadoop.record.RecordOutput _rio_a, final String _rio_tag)
+ throws java.io.IOException {
+ _rio_a.startRecord(this,_rio_tag);
+ _rio_a.writeString(reduceType,"reduceType");
+ _rio_a.writeString(key,"key");
+ _rio_a.endRecord(this,_rio_tag);
+ }
+ private void deserializeWithoutFilter(final org.apache.hadoop.record.RecordInput _rio_a, final String _rio_tag)
+ throws java.io.IOException {
+ _rio_a.startRecord(_rio_tag);
+ reduceType=_rio_a.readString("reduceType");
+ key=_rio_a.readString("key");
+ _rio_a.endRecord(_rio_tag);
+ }
+ public void deserialize(final org.apache.hadoop.record.RecordInput _rio_a, final String _rio_tag)
+ throws java.io.IOException {
+ if (null == _rio_rtiFilter) {
+ deserializeWithoutFilter(_rio_a, _rio_tag);
+ return;
+ }
+ // if we're here, we need to read based on version info
+ _rio_a.startRecord(_rio_tag);
+ setupRtiFields();
+ for (int _rio_i=0; _rio_i<_rio_rtiFilter.getFieldTypeInfos().size(); _rio_i++) {
+ if (1 == _rio_rtiFilterFields[_rio_i]) {
+ reduceType=_rio_a.readString("reduceType");
+ }
+ else if (2 == _rio_rtiFilterFields[_rio_i]) {
+ key=_rio_a.readString("key");
+ }
+ else {
+ java.util.ArrayList<org.apache.hadoop.record.meta.FieldTypeInfo> typeInfos = (java.util.ArrayList<org.apache.hadoop.record.meta.FieldTypeInfo>)(_rio_rtiFilter.getFieldTypeInfos());
+ org.apache.hadoop.record.meta.Utils.skip(_rio_a, typeInfos.get(_rio_i).getFieldID(), typeInfos.get(_rio_i).getTypeID());
+ }
+ }
+ _rio_a.endRecord(_rio_tag);
+ }
+ public int compareTo (final Object _rio_peer_) throws ClassCastException {
+ if (!(_rio_peer_ instanceof ChukwaRecordKey)) {
+ throw new ClassCastException("Comparing different types of records.");
+ }
+ ChukwaRecordKey _rio_peer = (ChukwaRecordKey) _rio_peer_;
+ int _rio_ret = 0;
+ _rio_ret = reduceType.compareTo(_rio_peer.reduceType);
+ if (_rio_ret != 0) return _rio_ret;
+ _rio_ret = key.compareTo(_rio_peer.key);
+ if (_rio_ret != 0) return _rio_ret;
+ return _rio_ret;
+ }
+ public boolean equals(final Object _rio_peer_) {
+ if (!(_rio_peer_ instanceof ChukwaRecordKey)) {
+ return false;
+ }
+ if (_rio_peer_ == this) {
+ return true;
+ }
+ ChukwaRecordKey _rio_peer = (ChukwaRecordKey) _rio_peer_;
+ boolean _rio_ret = false;
+ _rio_ret = reduceType.equals(_rio_peer.reduceType);
+ if (!_rio_ret) return _rio_ret;
+ _rio_ret = key.equals(_rio_peer.key);
+ if (!_rio_ret) return _rio_ret;
+ return _rio_ret;
+ }
+ public Object clone() throws CloneNotSupportedException {
+ ChukwaRecordKey _rio_other = new ChukwaRecordKey();
+ _rio_other.reduceType = this.reduceType;
+ _rio_other.key = this.key;
+ return _rio_other;
+ }
+ public int hashCode() {
+ int _rio_result = 17;
+ int _rio_ret;
+ _rio_ret = reduceType.hashCode();
+ _rio_result = 37*_rio_result + _rio_ret;
+ _rio_ret = key.hashCode();
+ _rio_result = 37*_rio_result + _rio_ret;
+ return _rio_result;
+ }
+ public static String signature() {
+ return "LChukwaRecordKey(ss)";
+ }
+ public static class Comparator extends org.apache.hadoop.record.RecordComparator {
+ public Comparator() {
+ super(ChukwaRecordKey.class);
+ }
+ static public int slurpRaw(byte[] b, int s, int l) {
+ try {
+ int os = s;
+ {
+ int i = org.apache.hadoop.record.Utils.readVInt(b, s);
+ int z = org.apache.hadoop.record.Utils.getVIntSize(i);
+ s+=(z+i); l-= (z+i);
+ }
+ {
+ int i = org.apache.hadoop.record.Utils.readVInt(b, s);
+ int z = org.apache.hadoop.record.Utils.getVIntSize(i);
+ s+=(z+i); l-= (z+i);
+ }
+ return (os - s);
+ } catch(java.io.IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ static public int compareRaw(byte[] b1, int s1, int l1,
+ byte[] b2, int s2, int l2) {
+ try {
+ int os1 = s1;
+ {
+ int i1 = org.apache.hadoop.record.Utils.readVInt(b1, s1);
+ int i2 = org.apache.hadoop.record.Utils.readVInt(b2, s2);
+ int z1 = org.apache.hadoop.record.Utils.getVIntSize(i1);
+ int z2 = org.apache.hadoop.record.Utils.getVIntSize(i2);
+ s1+=z1; s2+=z2; l1-=z1; l2-=z2;
+ int r1 = org.apache.hadoop.record.Utils.compareBytes(b1,s1,i1,b2,s2,i2);
+ if (r1 != 0) { return (r1<0)?-1:0; }
+ s1+=i1; s2+=i2; l1-=i1; l1-=i2;
+ }
+ {
+ int i1 = org.apache.hadoop.record.Utils.readVInt(b1, s1);
+ int i2 = org.apache.hadoop.record.Utils.readVInt(b2, s2);
+ int z1 = org.apache.hadoop.record.Utils.getVIntSize(i1);
+ int z2 = org.apache.hadoop.record.Utils.getVIntSize(i2);
+ s1+=z1; s2+=z2; l1-=z1; l2-=z2;
+ int r1 = org.apache.hadoop.record.Utils.compareBytes(b1,s1,i1,b2,s2,i2);
+ if (r1 != 0) { return (r1<0)?-1:0; }
+ s1+=i1; s2+=i2; l1-=i1; l1-=i2;
+ }
+ return (os1 - s1);
+ } catch(java.io.IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ public int compare(byte[] b1, int s1, int l1,
+ byte[] b2, int s2, int l2) {
+ int ret = compareRaw(b1,s1,l1,b2,s2,l2);
+ return (ret == -1)? -1 : ((ret==0)? 1 : 0);}
+ }
+
+ static {
+ org.apache.hadoop.record.RecordComparator.define(ChukwaRecordKey.class, new Comparator());
+ }
+}
Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaSearchResult.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaSearchResult.java?rev=723855&r1=723854&r2=723855&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaSearchResult.java (original)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaSearchResult.java Fri Dec 5 12:30:14 2008
@@ -22,10 +22,12 @@
import java.util.TreeMap;
+
public class ChukwaSearchResult implements SearchResult
{
private TreeMap<Long, List<Record>> records;
-
+ private Token token = null;
+
public TreeMap<Long, List<Record>> getRecords()
{
return records;
@@ -35,5 +37,15 @@
{
this.records = records;
}
+
+ public Token getToken()
+ {
+ return token;
+ }
+
+ public void setToken(Token token)
+ {
+ this.token = token;
+ }
}
Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaSearchService.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaSearchService.java?rev=723855&r1=723854&r2=723855&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaSearchService.java (original)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaSearchService.java Fri Dec 5 12:30:14 2008
@@ -29,7 +29,7 @@
{
private DataSourceFactory dataSourceFactory = DataSourceFactory.getInstance();
- public SearchResult search(String cluster,String[] dataSources,long t0,long t1,String filter)
+ public SearchResult search(String cluster,String[] dataSources,long t0,long t1,String filter,Token token)
throws DataSourceException
{
SearchResult result = new ChukwaSearchResult();
@@ -40,7 +40,7 @@
for(int i=0;i<dataSources.length;i++)
{
DataSource ds = dataSourceFactory.getDataSource(dataSources[i]);
- ds.search(result, cluster, dataSources[i], t0, t1, filter);
+ ds.search(result, cluster, dataSources[i], t0, t1, filter,token);
}
return result;
}
Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/Record.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/Record.java?rev=723855&r1=723854&r2=723855&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/Record.java (original)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/Record.java Fri Dec 5 12:30:14 2008
@@ -21,17 +21,21 @@
public interface Record
{
public static final String bodyField = "body";
-
+ public static final String sourceField = "csource";
+ public static final String applicationField = "capp";
+ public static final String tagsField = "ctags";
+ public static final String chunkDataField = "cchunkData";
+ public static final String chunkExceptionField = "cchunkException";
+
+ public static final String classField = "class";
public static final String logLevelField = "logLevel";
- public static final String destinationField = "dest";
- public static final String dataSourceField = "ds";
- public static final String sourceField = "src";
- public static final String streamNameField = "sname";
- public static final String typeField = "type";
- public static final String classField = "pkg";
- public static final String rawField = "raw";
- public static final String fieldSeparator = ":";
+// public static final String streamNameField = "sname";
+// public static final String typeField = "type";
+
+// public static final String rawField = "raw";
+
+// public static final String fieldSeparator = ":";
public long getTime();
public void add(String key, String value);
Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/RecordUtil.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/RecordUtil.java?rev=723855&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/RecordUtil.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/RecordUtil.java Fri Dec 5 12:30:14 2008
@@ -0,0 +1,23 @@
+package org.apache.hadoop.chukwa.extraction.engine;
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class RecordUtil
+{
+ static Pattern clusterPattern = Pattern.compile("(.*)?cluster=\"(.*?)\"(.*)?");
+ public static String getClusterName(Record record)
+ {
+ String tags = record.getValue(Record.tagsField);
+ if (tags!= null)
+ {
+ Matcher matcher = clusterPattern.matcher(tags);
+ if (matcher.matches())
+ {
+ return matcher.group(2);
+ }
+ }
+
+ return "undefined";
+ }
+}
Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/SearchResult.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/SearchResult.java?rev=723855&r1=723854&r2=723855&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/SearchResult.java (original)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/SearchResult.java Fri Dec 5 12:30:14 2008
@@ -23,8 +23,11 @@
+
public interface SearchResult
{
+ public void setToken(Token token);
+ public Token getToken();
public TreeMap<Long, List<Record>> getRecords();
public void setRecords(TreeMap<Long, List<Record>> records);
}
Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/SearchService.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/SearchService.java?rev=723855&r1=723854&r2=723855&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/SearchService.java (original)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/SearchService.java Fri Dec 5 12:30:14 2008
@@ -23,6 +23,6 @@
public interface SearchService
{
- public SearchResult search(String cluster,String[] dataSources,long t0,long t1,String filter)
+ public SearchResult search(String cluster,String[] dataSources,long t0,long t1,String filter,Token token)
throws DataSourceException;
}
Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/Token.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/Token.java?rev=723855&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/Token.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/Token.java Fri Dec 5 12:30:14 2008
@@ -0,0 +1,7 @@
+package org.apache.hadoop.chukwa.extraction.engine;
+
+public class Token
+{
+ public String key = null;
+ public boolean hasMore = false;
+}
Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/datasource/DataSource.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/datasource/DataSource.java?rev=723855&r1=723854&r2=723855&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/datasource/DataSource.java (original)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/datasource/DataSource.java Fri Dec 5 12:30:14 2008
@@ -19,6 +19,7 @@
package org.apache.hadoop.chukwa.extraction.engine.datasource;
import org.apache.hadoop.chukwa.extraction.engine.SearchResult;
+import org.apache.hadoop.chukwa.extraction.engine.Token;
@@ -28,7 +29,8 @@
public SearchResult
search( SearchResult result,String cluster,String dataSource,
long t0,long t1,
- String filter)
+ String filter,
+ Token token)
throws DataSourceException;
public boolean isThreadSafe();
Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/datasource/DataSourceFactory.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/datasource/DataSourceFactory.java?rev=723855&r1=723854&r2=723855&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/datasource/DataSourceFactory.java (original)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/datasource/DataSourceFactory.java Fri Dec 5 12:30:14 2008
@@ -21,7 +21,7 @@
import java.util.HashMap;
import org.apache.hadoop.chukwa.extraction.engine.datasource.database.DatabaseDS;
-import org.apache.hadoop.chukwa.extraction.engine.datasource.record.RecordDS;
+import org.apache.hadoop.chukwa.extraction.engine.datasource.record.ChukwaRecordDataSource;
public class DataSourceFactory
{
@@ -37,10 +37,7 @@
dataSources.put("MRJob", databaseDS);
dataSources.put("HodJob", databaseDS);
dataSources.put("QueueInfo", databaseDS);
-
- DataSource recordDS = new RecordDS();
- dataSources.put("NameNode", recordDS);
- dataSources.put("ChukwaLocalAgent", recordDS);
+
}
public static DataSourceFactory getInstance()
@@ -64,7 +61,7 @@
}
else
{
- DataSource hsdfsDS = new RecordDS();
+ DataSource hsdfsDS = new ChukwaRecordDataSource();
dataSources.put(datasourceName, hsdfsDS);
return hsdfsDS;
//TODO proto only!
Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/datasource/DsDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/datasource/DsDirectory.java?rev=723855&r1=723854&r2=723855&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/datasource/DsDirectory.java (original)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/datasource/DsDirectory.java Fri Dec 5 12:30:14 2008
@@ -22,6 +22,7 @@
import java.util.ArrayList;
import java.util.List;
+import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
import org.apache.hadoop.chukwa.inputtools.mdl.DataConfig;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
@@ -44,15 +45,20 @@
private DsDirectory()
{
dataConfig = new DataConfig();
- conf = new Configuration();
+ conf = new ChukwaConfiguration();
try
{
fs = FileSystem.get(conf);
- } catch (IOException e)
+ }
+ catch (IOException e)
{
e.printStackTrace();
}
rootFolder = dataConfig.get("chukwa.engine.dsDirectory.rootFolder");
+ if (!rootFolder.endsWith("/"))
+ {
+ rootFolder +="/";
+ }
}
public static DsDirectory getInstance()
@@ -94,7 +100,7 @@
public static void main(String[] args) throws DataSourceException
{
DsDirectory dsd = DsDirectory.getInstance();
- String[] dss = dsd.list("localhost");
+ String[] dss = dsd.list("unknown");
for (String d : dss)
{
System.out.println(d);
Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/datasource/database/DatabaseDS.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/datasource/database/DatabaseDS.java?rev=723855&r1=723854&r2=723855&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/datasource/database/DatabaseDS.java (original)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/datasource/database/DatabaseDS.java Fri Dec 5 12:30:14 2008
@@ -34,15 +34,16 @@
import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
import org.apache.hadoop.chukwa.extraction.engine.Record;
import org.apache.hadoop.chukwa.extraction.engine.SearchResult;
+import org.apache.hadoop.chukwa.extraction.engine.Token;
import org.apache.hadoop.chukwa.extraction.engine.datasource.DataSource;
import org.apache.hadoop.chukwa.extraction.engine.datasource.DataSourceException;
-import org.apache.hadoop.chukwa.hicc.ClusterConfig;
+//import org.apache.hadoop.chukwa.hicc.ClusterConfig;
public class DatabaseDS implements DataSource
{
public SearchResult search(SearchResult result, String cluster,
- String dataSource, long t0, long t1, String filter)
+ String dataSource, long t0, long t1, String filter,Token token)
throws DataSourceException
{
SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd kk:mm:ss");
@@ -79,8 +80,8 @@
String dateclause = timeField + " >= '" + startS
+ "' and " + timeField + " <= '" + endS + "'";
- ClusterConfig cc = new ClusterConfig();
- String jdbc = cc.getURL(cluster);
+ //ClusterConfig cc = new ClusterConfig();
+ String jdbc = ""; //cc.getURL(cluster);
Connection conn = DriverManager.getConnection(jdbc);
Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/datasource/record/ChukwaDSInternalResult.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/datasource/record/ChukwaDSInternalResult.java?rev=723855&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/datasource/record/ChukwaDSInternalResult.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/datasource/record/ChukwaDSInternalResult.java Fri Dec 5 12:30:14 2008
@@ -0,0 +1,21 @@
+package org.apache.hadoop.chukwa.extraction.engine.datasource.record;
+
+import java.util.List;
+
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
+import org.apache.hadoop.chukwa.extraction.engine.Record;
+
+public class ChukwaDSInternalResult
+{
+ List<Record> records = null;
+ String day = null;
+ int hour = 0;
+ int rawIndex = 0;
+ int spill = 1;
+ long position = -1;
+ long currentTs = -1;
+
+ String fileName = null;
+
+ ChukwaRecordKey key = null;
+}
Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/datasource/record/ChukwaRecordDataSource.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/datasource/record/ChukwaRecordDataSource.java?rev=723855&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/datasource/record/ChukwaRecordDataSource.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/datasource/record/ChukwaRecordDataSource.java Fri Dec 5 12:30:14 2008
@@ -0,0 +1,520 @@
+package org.apache.hadoop.chukwa.extraction.engine.datasource.record;
+
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.TreeMap;
+
+import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaSearchResult;
+import org.apache.hadoop.chukwa.extraction.engine.Record;
+import org.apache.hadoop.chukwa.extraction.engine.SearchResult;
+import org.apache.hadoop.chukwa.extraction.engine.Token;
+import org.apache.hadoop.chukwa.extraction.engine.datasource.DataSource;
+import org.apache.hadoop.chukwa.extraction.engine.datasource.DataSourceException;
+import org.apache.hadoop.chukwa.inputtools.mdl.DataConfig;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.log4j.Logger;
+
+public class ChukwaRecordDataSource implements DataSource
+{
+ //TODO need some cleanup after 1st production
+ // First implementation to get it working with the new directory structure
+
+ static Logger log = Logger.getLogger(ChukwaRecordDataSource.class);
+
+ private static final int dayFolder = 100;
+ private static final int hourFolder = 200;
+ private static final int rawFolder = 300;
+
+ static final String[] raws = {"0","5","10","15","20","25","30","35","40","45","50","55"};
+
+ private static FileSystem fs = null;
+ private static ChukwaConfiguration conf = null;
+
+ private static String rootDsFolder = null;
+ private static DataConfig dataConfig = null;
+
+ static
+ {
+ dataConfig = new DataConfig();
+ rootDsFolder = dataConfig.get("chukwa.engine.dsDirectory.rootFolder");
+ conf = new ChukwaConfiguration();
+ try
+ {
+ fs = FileSystem.get(conf);
+ } catch (IOException e)
+ {
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public boolean isThreadSafe()
+ {
+ return true;
+ }
+
+
+ @Override
+ public SearchResult search(SearchResult result, String cluster,
+ String dataSource, long t0, long t1, String filter,Token token)
+ throws DataSourceException
+ {
+ String filePath = rootDsFolder + "/" + cluster + "/";
+
+ log.debug("filePath [" + filePath + "]");
+ Calendar calendar = Calendar.getInstance();
+ calendar.setTimeInMillis(t0);
+ SimpleDateFormat sdf = new java.text.SimpleDateFormat("yyyyMMdd");
+ int maxCount = 200;
+
+ List<Record> records = new ArrayList<Record>();
+
+ ChukwaDSInternalResult res = new ChukwaDSInternalResult();
+
+ if (token != null)
+ {
+ // token.key = day + "|" + hour + "|" + raw + "|" + spill + "|" + res.currentTs + "|"+ res.position + "|"+ res.fileName;
+ try
+ {
+ String[] vars = token.key.split("\\|");
+ res.day = vars[0];
+ res.hour = Integer.parseInt(vars[1]);
+ res.rawIndex = Integer.parseInt(vars[2]);
+ res.spill = Integer.parseInt(vars[3]);
+ res.currentTs = Long.parseLong(vars[4]);
+ res.position = Long.parseLong(vars[5]);
+ res.fileName = vars[5];
+ log.info("Token is not null! :" + token.key);
+ }
+ catch(Exception e)
+ {
+ log.error("Incalid Key: [" + token.key + "] exception: ", e);
+ }
+ }
+ else
+ {
+ log.debug("Token is null!" );
+ }
+
+ try
+ {
+ do
+ {
+ log.debug("start Date [" + calendar.getTime() + "]");
+ String workingDay = sdf.format(calendar.getTime());
+ int workingHour = calendar.get(Calendar.HOUR_OF_DAY);
+ int startRawIndex = 0;
+ if (token !=null)
+ {
+ workingDay = res.day ;
+ workingHour = res.hour;
+ startRawIndex = res.rawIndex;
+ }
+ else
+ {
+ token = new Token();
+ }
+
+ log.debug("workingDay " + workingDay);
+ log.debug("workingHour " + workingHour);
+
+ if (exist(dayFolder,filePath,dataSource,workingDay,null,null))
+ {
+ // Extract Data for Day
+ if (containsRotateFlag(dayFolder,filePath,dataSource,workingDay,null))
+ {
+ // read data from day
+ // SystemMetrics/20080922/SystemMetrics_20080922.1.evt
+ log.debug("fs.exists(workingDayRotatePath) ");
+ extractRecords(res,ChukwaRecordDataSource.dayFolder,filePath,dataSource,workingDay, null, -1,
+ token, records, maxCount, t0, t1, filter);
+ maxCount = maxCount - records.size();
+ if ( (maxCount <= 0) || (res.currentTs > t1))
+ { break; }
+
+ } // End process Day File
+ else // check for hours
+ {
+ log.debug("check for hours");
+ for (int hour = 0; hour<24;hour ++)
+ {
+ if ( workingDay == res.day && hour<workingHour)
+ {
+ continue;
+ }
+ log.debug(" Hour? -->" + filePath + dataSource + "/"+ workingDay+ "/" + hour);
+ if (exist(dayFolder,filePath,dataSource,workingDay,""+hour,null))
+ {
+ if (containsRotateFlag(dayFolder,filePath,dataSource,workingDay,""+hour))
+ {
+ // read data from Hour
+ // SystemMetrics/20080922/12/SystemMetrics_20080922_12.1.evt
+ extractRecords(res,ChukwaRecordDataSource.hourFolder,filePath,dataSource,workingDay, ""+hour, -1,
+ token, records, maxCount, t0, t1, filter);
+ }
+ else // check for raw
+ {
+ log.debug("Working on Raw");
+
+ for(int rawIndex=startRawIndex;rawIndex<12;rawIndex++)
+ {
+ // read data from Raw
+ // SystemMetrics/20080922/0/25/SystemMetrics_20080922_0_25.1.evt
+ if (exist(dayFolder,filePath,dataSource,workingDay,""+hour,raws[rawIndex]))
+ {
+ extractRecords(res,ChukwaRecordDataSource.rawFolder,filePath,dataSource,workingDay, ""+hour, rawIndex,
+ token, records, maxCount, t0, t1, filter);
+ maxCount = maxCount - records.size();
+ if ( (maxCount <= 0) || (res.currentTs > t1))
+ { break; }
+ }
+ else
+ {
+ log.debug("<<<<<<<<<Working on Raw Not exist--> "
+ + filePath + dataSource + "/" + workingDay+ "/" + workingHour + "/" + raws[rawIndex] );
+ }
+ res.spill = 1;
+ }
+ }
+ } // End if (fs.exists(new Path(filePath + workingDay+ "/" + hour)))
+
+ maxCount = maxCount - records.size();
+ if ( (maxCount <= 0) || (res.currentTs > t1))
+ { break; }
+
+ } // End process all Hourly/raw files
+ }
+ }
+
+ maxCount = maxCount - records.size();
+ if ( (maxCount <= 0) || (res.currentTs > t1))
+ { break; }
+
+ // move to the next day
+ calendar.add(Calendar.DAY_OF_MONTH, +1);
+ calendar.set(Calendar.HOUR_OF_DAY, 0);
+ calendar.set(Calendar.MINUTE, 0);
+ calendar.set(Calendar.SECOND, 0);
+
+ }
+ while (calendar.getTimeInMillis() < t1);
+
+ }
+ catch(Exception e)
+ {
+ e.printStackTrace();
+ throw new DataSourceException(e);
+ }
+
+ TreeMap<Long, List<Record>> recordsInResult = result.getRecords();
+ for (Record record : records)
+ {
+ long timestamp = record.getTime();
+ if (recordsInResult.containsKey(timestamp))
+ {
+ recordsInResult.get(timestamp).add(record);
+ }
+ else
+ {
+ List<Record> list = new LinkedList<Record>();
+ list.add(record);
+ recordsInResult.put(timestamp, list);
+ }
+ }
+ result.setToken(token);
+ return result;
+
+ }
+
+ public void extractRecords(ChukwaDSInternalResult res,int directoryType,String rootFolder,String dataSource,String day,String hour,int rawIndex,
+ Token token,List<Record> records,int maxRows,long t0,long t1,String filter) throws Exception
+ {
+ // for each spill file
+ // extract records
+ int spill = res.spill;
+
+ boolean workdone = false;
+ do
+ {
+ String fileName = buildFileName(directoryType,rootFolder,dataSource,spill,day,hour,rawIndex);
+ log.debug("extractRecords : " + fileName);
+
+ if (fs.exists(new Path(fileName)))
+ {
+ readData(res,token,fileName,maxRows,t0,t1,filter);
+ res.spill = spill;
+ List<Record> localRecords = res.records;
+ log.debug("localRecords size : " + localRecords.size());
+ maxRows = maxRows - localRecords.size();
+ if (maxRows <= 0)
+ {
+ workdone = true;
+ }
+ records.addAll(localRecords);
+ log.debug("AFTER fileName [" +fileName + "] count=" + localRecords.size() + " maxCount=" + maxRows);
+ spill ++;
+ }
+ else
+ {
+ // no more spill
+ workdone = true;
+ }
+ }
+ while(!workdone);
+ token.key = day + "|" + hour + "|" + rawIndex + "|" + spill + "|" + res.currentTs + "|"+ res.position + "|" + res.fileName;
+ }
+
+
+ public void readData(ChukwaDSInternalResult res,Token token,String fileName,int maxRows,long t0, long t1,String filter) throws
+ Exception
+ {
+ List<Record> records = new LinkedList<Record>();
+ res.records = records;
+ SequenceFile.Reader r= null;
+ if (filter != null)
+ { filter = filter.toLowerCase();}
+
+ try
+ {
+
+ if (!fs.exists(new Path(fileName)))
+ {
+ log.debug("fileName not there!");
+ return;
+ }
+ log.debug("Parser Open [" +fileName + "]");
+
+ long timestamp = 0;
+ int listSize = 0;
+ ChukwaRecordKey key = new ChukwaRecordKey();
+ ChukwaRecord record = new ChukwaRecord();
+
+ r= new SequenceFile.Reader(fs, new Path(fileName), conf);
+
+ log.debug("readData Open2 [" +fileName + "]");
+ if ( (fileName.equals(res.fileName)) && (res.position != -1))
+ {
+ r.seek(res.position);
+ }
+ res.fileName = fileName;
+
+ while(r.next(key, record))
+ {
+ if (record != null)
+ {
+ res.position = r.getPosition();
+
+ timestamp = record.getTime();
+ res.currentTs = timestamp;
+ log.debug("\nSearch for startDate: " + new Date(t0) + " is :" + new Date(timestamp));
+
+ if (timestamp < t0)
+ {
+ //log.debug("Line not in range. Skipping: " +record);
+ continue;
+ }
+ else if (timestamp < t1)
+ {
+ log.debug("In Range: " + record.toString());
+ boolean valid = false;
+
+ if ( (filter == null || filter.equals("") ))
+ {
+ valid = true;
+ }
+ else if ( isValid(record,filter))
+ {
+ valid = true;
+ }
+
+ if (valid)
+ {
+ records.add(record);
+ record = new ChukwaRecord();
+ listSize = records.size();
+ if (listSize >= maxRows)
+ {
+ // maxRow so stop here
+ //Update token
+ token.key = key.getKey();
+ token.hasMore = true;
+ break;
+ }
+ }
+ else
+ {
+ log.debug("In Range ==================>>>>>>>>> OUT Regex: " + record);
+ }
+ }
+ else
+ {
+ log.debug("Line out of range. Stopping now: " +record);
+ // Update Token
+ token.key = key.getKey();
+ token.hasMore = false;
+ break;
+ }
+ }
+ }
+
+ }
+ catch(Exception e)
+ {
+ e.printStackTrace();
+ }
+ finally
+ {
+ try
+ {
+ r.close();
+ }
+ catch(Exception e){}
+ }
+ }
+
+ public boolean containsRotateFlag(int directoryType,String rootFolder,String dataSource,String workingDay,String workingHour) throws Exception
+ {
+ boolean contains = false;
+ switch(directoryType)
+ {
+ case ChukwaRecordDataSource.dayFolder:
+ // SystemMetrics/20080922/rotateDone
+ contains = fs.exists(new Path( rootFolder + dataSource + "/" + workingDay+"/rotateDone"));
+ break;
+
+ case ChukwaRecordDataSource.hourFolder:
+ // SystemMetrics/20080922/12/rotateDone
+ contains = fs.exists(new Path( rootFolder + dataSource + "/" + workingDay+ "/" + workingHour +"/rotateDone"));
+ break;
+
+ }
+ return contains;
+ }
+
+ public boolean exist(int directoryType,String rootFolder,String dataSource,String workingDay,String workingHour,String raw) throws Exception
+ {
+ boolean contains = false;
+ switch(directoryType)
+ {
+ case ChukwaRecordDataSource.dayFolder:
+ // SystemMetrics/20080922/rotateDone
+ contains = fs.exists(new Path( rootFolder + dataSource + "/" + workingDay));
+ break;
+
+ case ChukwaRecordDataSource.hourFolder:
+ // SystemMetrics/20080922/12/rotateDone
+ contains = fs.exists(new Path( rootFolder + dataSource + "/" + workingDay+ "/" + workingHour ));
+ break;
+ case ChukwaRecordDataSource.rawFolder:
+ // SystemMetrics/20080922/12/rotateDone
+ contains = fs.exists(new Path( rootFolder + dataSource + "/" + workingDay+ "/" + workingHour + "/" + raw));
+ break;
+
+ }
+ return contains;
+ }
+
+
+ protected boolean isValid(ChukwaRecord record, String filter)
+ {
+ String[] fields = record.getFields();
+ for(String field: fields)
+ {
+ if ( record.getValue(field).toLowerCase().indexOf(filter) >= 0)
+ {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public String buildFileName(int directoryType,String rootFolder,String dataSource,int spill,String day,String hour,int rawIndex)
+ {
+ String fileName = null;
+ // TODO use StringBuilder
+ // TODO revisit the way we're building fileName
+
+ switch(directoryType)
+ {
+ case ChukwaRecordDataSource.dayFolder:
+ // SystemMetrics/20080922/SystemMetrics_20080922.1.evt
+ fileName = rootFolder + "/" + dataSource + "/" + day + "/"
+ + dataSource + "_" + day + "." + spill + ".evt";
+ break;
+
+ case ChukwaRecordDataSource.hourFolder:
+ // SystemMetrics/20080922/12/SystemMetrics_20080922_12.1.evt
+ fileName = rootFolder + "/" + dataSource + "/" + day + "/" + hour + "/"
+ + dataSource + "_" + day + "_" + hour + "." + spill + ".evt";
+ break;
+
+ case ChukwaRecordDataSource.rawFolder:
+ // SystemMetrics/20080922/0/25/SystemMetrics_20080922_0_25.1.evt
+ fileName = rootFolder + "/" + dataSource + "/" + day + "/" + hour + "/" + raws[rawIndex] + "/"
+ + dataSource + "_" + day + "_" + hour + "_" + raws[rawIndex] + "." + spill + ".evt";
+ break;
+ }
+ log.debug("buildFileName :" + fileName);
+ return fileName;
+ }
+
+ public static void main(String[] args) throws DataSourceException
+ {
+ ChukwaRecordDataSource ds = new ChukwaRecordDataSource();
+ SearchResult result = new ChukwaSearchResult();
+ result.setRecords( new TreeMap<Long,List<Record>>());
+ String cluster = args[0];
+ String dataSource = args[1];
+ long t0 = Long.parseLong(args[2]);
+ long t1 = Long.parseLong(args[3]);
+ String filter = null;
+ Token token = null;
+
+ if (args.length >= 5 && !args[4].equalsIgnoreCase("null"))
+ {
+ filter = args[4];
+ }
+ if (args.length == 6)
+ {
+ token = new Token();
+ token.key = args[5];
+ System.out.println("token :" + token.key);
+ }
+
+ System.out.println("cluster :" + cluster);
+ System.out.println("dataSource :" + dataSource);
+ System.out.println("t0 :" + t0);
+ System.out.println("t1 :" + t1);
+ System.out.println("filter :" +filter );
+
+
+ ds.search(result, cluster, dataSource, t0, t1, filter,token);
+ TreeMap<Long, List<Record>> records = result.getRecords();
+ Iterator<Long> it = records.keySet().iterator();
+
+ while(it.hasNext())
+ {
+ long ts = it.next();
+ System.out.println("\n\nTimestamp: " + new Date(ts));
+ List<Record> list = records.get(ts);
+ for (int i=0;i<list.size();i++)
+ {
+ System.out.println(list.get(i));
+ }
+ }
+
+ if (result.getToken() != null)
+ { System.out.println("Key -->" + result.getToken().key);}
+ }
+}
Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/datasource/record/ChukwaSequenceFileParser.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/datasource/record/ChukwaSequenceFileParser.java?rev=723855&r1=723854&r2=723855&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/datasource/record/ChukwaSequenceFileParser.java (original)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/datasource/record/ChukwaSequenceFileParser.java Fri Dec 5 12:30:14 2008
@@ -19,18 +19,17 @@
package org.apache.hadoop.chukwa.extraction.engine.datasource.record;
import java.io.IOException;
-import java.util.Calendar;
+import java.util.Date;
import java.util.LinkedList;
import java.util.List;
-import java.util.Date;
import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
import org.apache.hadoop.chukwa.extraction.engine.Record;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Text;
public class ChukwaSequenceFileParser
{
@@ -67,60 +66,59 @@
long offset = 0;
// HdfsWriter.HdfsWriterKey key = new HdfsWriter.HdfsWriterKey();
- Text key = new Text();
-
- ChukwaRecord evt = new ChukwaRecord();
- while(r.next(key, evt))
+ ChukwaRecordKey key = new ChukwaRecordKey();
+ ChukwaRecord record = new ChukwaRecord();
+
+ while(r.next(key, record))
{
lineCount ++;
- System.out.println("NameNodeParser Line [" +evt.getValue(Record.bodyField) + "]");
+ System.out.println("NameNodeParser Line [" +record.getValue(Record.bodyField) + "]");
- if (evt != null)
+ if (record != null)
{
- timestamp = evt.getTime();
+ timestamp = record.getTime();
if (timestamp < t0)
{
- System.out.println("Line not in range. Skipping: " +evt.getValue(Record.bodyField));
+ System.out.println("Line not in range. Skipping: " +record.getValue(Record.bodyField));
System.out.println("Search for: " + new Date(t0) + " is :" + new Date(timestamp));
continue;
}
else if ((timestamp < t1) && (offset < maxOffset )) //JB (epochTS < maxDate)
{
- System.out.println("In Range: " + evt.getValue(Record.bodyField));
+ System.out.println("In Range: " + record.getValue(Record.bodyField));
boolean valid = false;
if ( (filter == null || filter.equals("") ))
{
valid = true;
}
- else if (evt.getValue(Record.rawField).toLowerCase().indexOf(filter) > 0)
- {
- System.out.println("MATCH " + filter + "===========================>>>>>>>" + evt.getValue(Record.rawField));
- valid = true;
- }
+ else if ( isValid(record,filter))
+ {
+ valid = true;
+ }
if (valid)
{
- records.add(evt);
-evt = new ChukwaRecord();
+ records.add(record);
+ record = new ChukwaRecord();
listSize = records.size();
if (listSize > maxRows)
{
records.remove(0);
- System.out.println("==========>>>>>REMOVING: " + evt.getValue(Record.bodyField));
+ System.out.println("==========>>>>>REMOVING: " + record.getValue(Record.bodyField));
}
}
else
{
- System.out.println("In Range ==================>>>>>>>>> OUT Regex: " + evt.getValue(Record.bodyField));
+ System.out.println("In Range ==================>>>>>>>>> OUT Regex: " + record.getValue(Record.bodyField));
}
}
else
{
- System.out.println("Line out of range. Stopping now: " +evt.getValue(Record.bodyField));
+ System.out.println("Line out of range. Stopping now: " +record.getValue(Record.bodyField));
break;
}
}
@@ -146,26 +144,17 @@
return records;
}
- public static void main(String[] args) throws Throwable
+
+ protected static boolean isValid(ChukwaRecord record, String filter)
{
- Configuration conf = new Configuration();
-
- FileSystem fs = FileSystem.get(conf);//FileSystem.get(new URI(fsURL), conf);
- Calendar c = Calendar.getInstance();
- c.add(Calendar.MONTH, -2);
-
- ChukwaSequenceFileParser.readData( "/tmp/t1", "NameNode",
- 200, new java.util.Date().getTime(),
- c.getTimeInMillis(), Long.MAX_VALUE, null,
- args[0], fs, conf);
-
- SequenceFile.Reader r= new SequenceFile.Reader(fs, new Path(args[0]), conf);
- Text key = new Text();
-
- ChukwaRecord evt = new ChukwaRecord();
- while(r.next(key, evt))
- {
- System.out.println( evt);
- }
+ String[] fields = record.getFields();
+ for(String field: fields)
+ {
+ if ( record.getValue(field).toLowerCase().indexOf(filter) >= 0)
+ {
+ return true;
+ }
+ }
+ return false;
}
}
Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/datasource/record/RecordDS.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/datasource/record/RecordDS.java?rev=723855&r1=723854&r2=723855&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/datasource/record/RecordDS.java (original)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/datasource/record/RecordDS.java Fri Dec 5 12:30:14 2008
@@ -19,20 +19,19 @@
package org.apache.hadoop.chukwa.extraction.engine.datasource.record;
import java.io.IOException;
-import java.text.ParseException;
+import java.text.SimpleDateFormat;
import java.util.Calendar;
-import java.util.Date;
import java.util.LinkedList;
import java.util.List;
import java.util.TreeMap;
-import org.apache.hadoop.chukwa.extraction.engine.ChukwaSearchResult;
+import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
import org.apache.hadoop.chukwa.extraction.engine.Record;
import org.apache.hadoop.chukwa.extraction.engine.SearchResult;
+import org.apache.hadoop.chukwa.extraction.engine.Token;
import org.apache.hadoop.chukwa.extraction.engine.datasource.DataSource;
import org.apache.hadoop.chukwa.extraction.engine.datasource.DataSourceException;
import org.apache.hadoop.chukwa.inputtools.mdl.DataConfig;
-import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
import org.apache.hadoop.fs.FileSystem;
public class RecordDS implements DataSource
@@ -64,7 +63,7 @@
String dataSource,
long t0,
long t1,
- String filter)
+ String filter,Token token)
throws DataSourceException
{
@@ -77,20 +76,20 @@
TreeMap<Long, List<Record>> records = result.getRecords();
int maxCount = 200;
-
+ SimpleDateFormat sdf = new java.text.SimpleDateFormat("_yyyyMMdd_HH_");
do
{
System.out.println("start Date [" + calendar.getTime() + "]");
- String fileName = new java.text.SimpleDateFormat("_yyyy_MM_dd_HH").format(calendar.getTime());
+ String fileName = sdf.format(calendar.getTime());
int minutes = calendar.get(Calendar.MINUTE);
int dec = minutes/10;
- fileName += "_" + dec ;
+ fileName += dec ;
int m = minutes - (dec*10);
if (m < 5)
- { fileName += "0.evt";}
+ { fileName += "0.1.evt";}
else
- { fileName += "5.evt";}
+ { fileName += "5.1.evt";}
fileName = filePath + "/" + dataSource + fileName;
@@ -145,39 +144,6 @@
return result;
}
-
-
- public static void main(String[] args) throws DataSourceException
- {
- long t1 = 0;
- long t0 = 0;
- System.out.println("Hello");
- Calendar calendar = Calendar.getInstance();
- Date d1;
- try
- {
- d1 = new java.text.SimpleDateFormat ("dd/MM/yyyy HH:mm:ss").parse("05/06/2008 19:31:05");
- calendar.setTime(d1);
- t1 = calendar.getTimeInMillis();
- d1 = new java.text.SimpleDateFormat ("dd/MM/yyyy HH:mm:ss").parse("05/06/2008 19:26:05");
- calendar.setTime(d1);
- t0 = calendar.getTimeInMillis();
-
- } catch (ParseException e)
- {
- e.printStackTrace();
- throw new RuntimeException(e);
- }
-
- String filter = null;
- RecordDS dao = new RecordDS();
- SearchResult result = new ChukwaSearchResult();
-
- TreeMap<Long, List<Record>> records = new TreeMap<Long,List<Record>> ();
- result.setRecords(records);
-
- dao.search(result,"output2","NameNode",t0,t1,filter);
- }
public boolean isThreadSafe()
{
Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/hicc/Chart.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/hicc/Chart.java?rev=723855&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/hicc/Chart.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/hicc/Chart.java Fri Dec 5 12:30:14 2008
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.hicc;
+
+import java.io.PrintWriter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.TreeMap;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.Date;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import javax.servlet.http.HttpServletRequest;
+import org.apache.hadoop.chukwa.hicc.ColorPicker;
+
+
+public class Chart {
+ private String id;
+ private String title;
+ private String graphType;
+ private ArrayList<TreeMap<String, TreeMap<String, Double>>> dataset;
+ private ArrayList<String> chartType;
+ private boolean xLabelOn;
+ private boolean yLabelOn;
+ private boolean yRightLabelOn;
+ private int width;
+ private int height;
+ private List<String> xLabelRange;
+ private HttpServletRequest request = null;
+ private boolean legend;
+ private String xLabel="";
+ private String yLabel="";
+ private String yRightLabel="";
+ private int datasetCounter=0;
+ private double max=0;
+ private int seriesCounter=0;
+ private List<String> rightList;
+ private boolean userDefinedMax = false;
+ private String[] seriesOrder=null;
+
+ public Chart(HttpServletRequest request) {
+ if(request.getParameter("boxId")!=null) {
+ this.id=request.getParameter("boxId");
+ } else {
+ this.id="0";
+ }
+ this.title="Untitled Chart";
+ this.graphType="image";
+ this.xLabelOn=true;
+ this.yLabelOn=true;
+ this.width=400;
+ this.height=200;
+ this.request=request;
+ this.legend=true;
+ this.max=0;
+ this.datasetCounter=0;
+ this.seriesCounter=0;
+ this.rightList = new ArrayList<String>();
+ this.userDefinedMax=false;
+ this.seriesOrder=null;
+ }
+
+ public void setYMax(double max) {
+ this.max=max;
+ this.userDefinedMax=true;
+ }
+
+ public void setSize(int width, int height) {
+ this.width=width;
+ this.height=height;
+ }
+ public void setGraphType(String graphType) {
+ if(graphType!=null) {
+ this.graphType = graphType;
+ }
+ }
+
+ public void setTitle(String title) {
+ this.title=title;
+ }
+
+ public void setId(String id) {
+ this.id=id;
+ }
+
+ public void setDataSet(String chartType, TreeMap<String, TreeMap<String, Double>> data) {
+ if(this.dataset==null) {
+ this.dataset = new ArrayList<TreeMap<String, TreeMap<String, Double>>>();
+ this.chartType = new ArrayList<String>();
+ }
+ this.dataset.add(data);
+ this.chartType.add(chartType);
+ }
+
+ public void setSeriesOrder(String[] metrics) {
+ this.seriesOrder = metrics;
+ }
+
+ public void setXAxisLabels(boolean toggle) {
+ xLabelOn = toggle;
+ }
+
+ public void setYAxisLabels(boolean toggle) {
+ yLabelOn = toggle;
+ }
+
+ public void setYAxisRightLabels(boolean toggle) {
+ yRightLabelOn = toggle;
+ }
+
+ public void setXAxisLabel(String label) {
+ xLabel = label;
+ }
+
+ public void setYAxisLabel(String label) {
+ yLabel = label;
+ }
+
+ public void setYAxisRightLabel(String label) {
+ yRightLabel = label;
+ }
+
+ public void setXLabelsRange(List<String> range) {
+ xLabelRange = range;
+ }
+
+ public void setLegend(boolean toggle) {
+ legend = toggle;
+ }
+ public String plot() {
+ String output="";
+ if(dataset==null) {
+ output = "No Data available.";
+ return output;
+ }
+ String dateFormat="%H:%M";
+ if(xLabel.equals("Time")) {
+ SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ long xMin;
+ try {
+ xMin = Long.parseLong(xLabelRange.get(0));
+ long xMax = Long.parseLong(xLabelRange.get(xLabelRange.size()-1));
+ if(xMax-xMin>31536000000L) {
+ dateFormat="%y";
+ } else if(xMax-xMin>2592000000L) {
+ dateFormat="%y-%m";
+ } else if(xMax-xMin>604800000L) {
+ dateFormat="%m-%d";
+ } else if(xMax-xMin>86400000L) {
+ dateFormat="%m-%d %H:%M";
+ }
+ } catch (NumberFormatException e) {
+ dateFormat="%y-%m-%d %H:%M";
+ }
+ }
+ output = "<html><link href=\"/hicc/css/default.css\" rel=\"stylesheet\" type=\"text/css\">\n" +
+ "<body onresize=\"wholePeriod()\"><script type=\"text/javascript\" src=\"/hicc/js/jquery-1.2.6.min.js\"></script>\n"+
+ "<script type=\"text/javascript\" src=\"/hicc/js/jquery.flot.pack.js\"></script>\n"+
+ "<script type=\"text/javascript\" src=\"/hicc/js/excanvas.pack.js\"></script>\n"+
+ "<center>"+title+"</center>\n"+
+ "<div id=\"placeholder\" style=\"width:"+this.width+"px;height:"+this.height+"px;\"></div>\n"+
+ "<div id=\"placeholderLegend\"></div>\n"+
+ "<input type=\"hidden\" id=\"boxId\" value=\"iframe"+this.id+"\">\n"+
+ "<script type=\"text/javascript\" src=\"/hicc/js/flot.extend.js\">\n" +
+ "</script>\n" +
+ "<script type=\"text/javascript\">\n"+
+ "var cw = document.body.clientWidth-70;\n"+
+ "var ch = document.body.clientHeight-50;\n"+
+ "document.getElementById('placeholder').style.width=cw+'px';\n"+
+ "document.getElementById('placeholder').style.height=ch+'px';\n"+
+ "_options={\n"+
+ " points: { show: false },\n"+
+ " xaxis: { timeformat: \""+dateFormat+"\",\n"+
+ " mode: \"time\"\n"+
+ " },\n"+
+ " selection: { mode: \"x\" },\n"+
+ " grid: {\n"+
+ " clickable: true,\n"+
+ " hoverable: true,\n"+
+ " tickColor: \"#C0C0C0\",\n"+
+ " backgroundColor:\"#FFFFFF\"\n"+
+ " },\n"+
+ " legend: { show: "+this.legend+" },\n"+
+ " yaxis: { ";
+ boolean stack = false;
+ for(String type : this.chartType) {
+ if(type.startsWith("stack")) {
+ stack=true;
+ }
+ }
+ if(stack) {
+ output = output + "mode: \"stack\", ";
+ }
+ if(userDefinedMax) {
+ output = output + "tickFormatter: function(val, axis) { " +
+ "return val.toFixed(axis.tickDecimals) + \" %\"; }";
+ } else {
+ output = output + "tickFormatter: function(val, axis) { " +
+ "if (val > 1000000000000000) return (val / 1000000000000000).toFixed(axis.tickDecimals) + \"PB\";" +
+ "else if (val > 1000000000000) return (val / 1000000000000).toFixed(axis.tickDecimals) + \"TB\";" +
+ "else if (val > 1000000000) return (val / 1000000000).toFixed(axis.tickDecimals) + \"GB\";" +
+ "else if (val > 1000000) return (val / 1000000).toFixed(axis.tickDecimals) + \"MB\";" +
+ "else if (val > 1000) return (val / 1000).toFixed(axis.tickDecimals) + \"KB\";" +
+ "else return val.toFixed(axis.tickDecimals) + \"B\"; }";
+ }
+ if(userDefinedMax) {
+ output = output + ", min:0, max:"+this.max;
+ }
+ output = output + "}\n";
+ output = output +
+ " };\n"+
+ "_series=[\n";
+ ArrayList<Long> numericLabelRange = new ArrayList<Long>();
+ if(xLabel.equals("Time")) {
+ SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ for(int i=0;i<xLabelRange.size();i++) {
+ try {
+ Date d = formatter.parse(xLabelRange.get(i));
+ numericLabelRange.add(d.getTime());
+ } catch(Exception e) {
+ }
+ }
+ } else {
+ for(int i=0;i<xLabelRange.size();i++) {
+ numericLabelRange.add(Long.parseLong(xLabelRange.get(i)));
+ }
+ }
+ ColorPicker cp = new ColorPicker();
+ int i=0;
+ for(TreeMap<String, TreeMap<String, Double>> dataMap : this.dataset) {
+ String[] keyNames = null;
+ if(this.seriesOrder!=null) {
+ keyNames = this.seriesOrder;
+ } else {
+ keyNames = ((String[]) dataMap.keySet().toArray(new String[dataMap.size()]));
+ }
+ int counter=0;
+ if(i!=0) {
+ if(!this.userDefinedMax) {
+ this.max=0;
+ }
+ }
+ for(String ki : keyNames) {
+ TreeMap<String, Double> data = dataMap.get(ki);
+ if(data!=null) {
+ for(String dp: data.keySet()) {
+ try {
+ if(data.get(dp)>this.max) {
+ if(!this.userDefinedMax) {
+ this.max=data.get(dp);
+ }
+ }
+ } catch (NullPointerException e) {
+ // skip if this data point does not exist.
+ }
+ }
+ }
+ }
+ for(String seriesName : keyNames) {
+ int counter2=0;
+ if(counter!=0) {
+ output+=",";
+ }
+ String param="fill: false";
+ String type="lines";
+ if(this.chartType.get(i).equals("stack-area") || this.chartType.get(i).equals("area")) {
+ param="fill: true";
+ }
+ if(this.chartType.get(i).equals("bar")) {
+ type="bars";
+ param="stepByStep: true";
+ }
+ output+=" {"+type+": { show: true, "+param+" }, color: \""+cp.get(counter+1)+"\", label: \""+seriesName+"\", ";
+ String showYAxis="false";
+ String shortRow="false";
+ if(counter==0 || i>0) {
+ showYAxis="true";
+ shortRow="false";
+ }
+ output+=" row: { show: "+showYAxis+",shortRow:"+shortRow+", showYAxis:"+showYAxis+"}, data:[";
+ TreeMap<String, Double> data = dataMap.get(seriesName);
+ for(String dp : data.keySet()) {
+ if(counter2!=0) {
+ output+=",";
+ }
+ output+="["+dp+","+data.get(dp)+"]";
+ counter2++;
+ }
+ output+="], min:0, max:"+this.max+"}";
+ counter++;
+ }
+ i++;
+ }
+ output+=" ];\n"+
+ " wholePeriod();</script></body></html>\n";
+ return output;
+ }
+}
Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/hicc/ClusterConfig.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/hicc/ClusterConfig.java?rev=723855&r1=723854&r2=723855&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/hicc/ClusterConfig.java (original)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/hicc/ClusterConfig.java Fri Dec 5 12:30:14 2008
@@ -23,7 +23,7 @@
public class ClusterConfig {
public static HashMap<String, String> clusterMap = new HashMap<String, String>();
- private String path=System.getenv("CHUKWA_HOME")+File.separator+"conf"+File.separator;
+ private String path=System.getenv("CHUKWA_CONF_DIR")+File.separator;
static public String getContents(File aFile) {
//...checks on aFile are elided
StringBuffer contents = new StringBuffer();
Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/hicc/DatasetMapper.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/hicc/DatasetMapper.java?rev=723855&r1=723854&r2=723855&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/hicc/DatasetMapper.java (original)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/hicc/DatasetMapper.java Fri Dec 5 12:30:14 2008
@@ -18,6 +18,8 @@
package org.apache.hadoop.chukwa.hicc;
+import java.text.SimpleDateFormat;
+import java.util.TreeMap;
import java.util.HashMap;
import java.util.ArrayList;
import java.util.List;
@@ -29,19 +31,21 @@
public class DatasetMapper {
private String jdbc;
private static Log log = LogFactory.getLog(DatasetMapper.class);
- private HashMap<String, ArrayList<Double>> dataset;
+ private TreeMap<String, TreeMap<String, Double>> dataset;
private List<String> labels;
public DatasetMapper(String jdbc) {
this.jdbc=jdbc;
- this.dataset = new HashMap<String, ArrayList<Double>>();
+ this.dataset = new TreeMap<String, TreeMap<String, Double>>();
this.labels = new ArrayList<String>();
}
- public void execute(String query, boolean groupBySecondColumn) {
+ public void execute(String query, boolean groupBySecondColumn, boolean calculateSlope, String formatTime) {
+ SimpleDateFormat sdf = null;
dataset.clear();
try {
// The newInstance() call is a work around for some
// broken Java implementations
- Class.forName("com.mysql.jdbc.Driver").newInstance();
+ String jdbcDriver = System.getenv("JDBC_DRIVER");
+ Class.forName(jdbcDriver).newInstance();
} catch (Exception ex) {
// handle the error
}
@@ -61,11 +65,18 @@
rs = stmt.getResultSet();
ResultSetMetaData rmeta = rs.getMetaData();
int col=rmeta.getColumnCount();
+ double[] previousArray = new double[col+1];
+ for(int k=0;k<col;k++) {
+ previousArray[k]=0.0;
+ }
int i=0;
- java.util.ArrayList<Double> data = null;
+ java.util.TreeMap<String, Double> data = null;
+ HashMap<String, Double> previousHash = new HashMap<String, Double>();
HashMap<String, Integer> xAxisMap = new HashMap<String, Integer>();
while (rs.next()) {
- String label = rs.getString(1);
+ String label = "";
+ long time = rs.getTimestamp(1).getTime();
+ label = ""+time;
if(!xAxisMap.containsKey(label)) {
xAxisMap.put(label, i);
labels.add(label);
@@ -74,15 +85,33 @@
if(groupBySecondColumn) {
String item = rs.getString(2);
// Get the data from the row using the series column
- double current = rs.getDouble(3);
- if(current>max) {
- max=current;
- }
data = dataset.get(item);
if(data == null) {
- data = new java.util.ArrayList<Double>();
+ data = new java.util.TreeMap<String, Double>();
+ }
+ if(calculateSlope) {
+ double current = rs.getDouble(3);
+ double tmp = 0L;
+ if(data.size()>1) {
+ tmp = current - previousHash.get(item).doubleValue();
+ } else {
+ tmp = 0;
+ }
+ if(tmp<0) {
+ tmp=0;
+ }
+ if(tmp>max) {
+ max=tmp;
+ }
+ previousHash.put(item,current);
+ data.put(label, tmp);
+ } else {
+ double current = rs.getDouble(3);
+ if(current>max) {
+ max=current;
+ }
+ data.put(label, current);
}
- data.add(rs.getDouble(3));
dataset.put(item,data);
} else {
for(int j=2;j<=col;j++) {
@@ -94,16 +123,30 @@
}
data = dataset.get(item);
if(data == null) {
- data = new java.util.ArrayList<Double>();
+ data = new java.util.TreeMap<String, Double>();
+ }
+ if(calculateSlope) {
+ double tmp = rs.getDouble(j);
+ if(data.size()>1) {
+ tmp = tmp - previousArray[j];
+ } else {
+ tmp = 0.0;
+ }
+ previousArray[j]=current;
+ if(tmp<0) {
+ tmp=0;
+ }
+ data.put(label, tmp);
+ } else {
+ data.put(label, current);
}
- data.add(rs.getDouble(j));
dataset.put(item,data);
}
}
}
labelsCount=i;
} else {
- log.error("query is not executed.");
+ log.error("query is not executed.");
}
// Now do something with the ResultSet ....
} catch (SQLException ex) {
@@ -146,7 +189,7 @@
public List<String> getXAxisMap() {
return labels;
}
- public HashMap<String, ArrayList<Double>> getDataset() {
+ public TreeMap<String, TreeMap<String, Double>> getDataset() {
return dataset;
}
}