You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cd...@apache.org on 2008/12/05 21:30:21 UTC

svn commit: r723855 [9/23] - in /hadoop/core/trunk: ./ src/contrib/ src/contrib/chukwa/ src/contrib/chukwa/bin/ src/contrib/chukwa/conf/ src/contrib/chukwa/docs/ src/contrib/chukwa/docs/paper/ src/contrib/chukwa/hadoop-packaging/ src/contrib/chukwa/lib...

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/MRJobReduceProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/MRJobReduceProcessor.java?rev=723855&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/MRJobReduceProcessor.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/MRJobReduceProcessor.java Fri Dec  5 12:30:14 2008
@@ -0,0 +1,81 @@
+package org.apache.hadoop.chukwa.extraction.demux.processor.reducer;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
+import org.apache.hadoop.chukwa.extraction.engine.Record;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.log4j.Logger;
+
+public class MRJobReduceProcessor implements ReduceProcessor
+{
+	static Logger log = Logger.getLogger(MRJobReduceProcessor.class);
+	@Override
+	public String getDataType()
+	{
+		return MRJobReduceProcessor.class.getName();
+	}
+
+	@Override
+	public void process(ChukwaRecordKey key, Iterator<ChukwaRecord> values,
+			OutputCollector<ChukwaRecordKey, ChukwaRecord> output,
+			Reporter reporter)
+	{
+		try
+		{
+			HashMap<String, String> data = new HashMap<String, String>();
+			
+			ChukwaRecord record = null;
+			String[] fields = null;
+			while(values.hasNext())
+			{
+				record = values.next();
+				fields = record.getFields();
+				for(String field: fields)
+				{
+					data.put(field, record.getValue(field));
+				}
+			}
+			
+			//Extract initial time: SUBMIT_TIME
+			long initTime = Long.parseLong(data.get("SUBMIT_TIME"));
+			
+			// Extract HodId
+			// maybe use a regex to extract this and load it from configuration
+			// JOBCONF="/user/xxx/mapredsystem/563976.xxx.yyy.com/job_200809062051_0001/job.xml"
+			String jobConf = data.get("JOBCONF");
+			int idx = jobConf.indexOf("mapredsystem/");
+			idx += 13;
+			int idx2 = jobConf.indexOf(".", idx);
+			data.put("HodId", jobConf.substring(idx, idx2)); 
+			
+			ChukwaRecordKey newKey = new ChukwaRecordKey();
+			newKey.setKey(""+initTime);
+			newKey.setReduceType("MRJob");
+			
+			ChukwaRecord newRecord = new ChukwaRecord();
+			newRecord.add(Record.tagsField, record.getValue(Record.tagsField));
+			newRecord.setTime(initTime);
+			newRecord.add(Record.tagsField, record.getValue(Record.tagsField));
+			Iterator<String> it = data.keySet().iterator();
+			while(it.hasNext())
+			{
+				String field = it.next();
+				newRecord.add(field, data.get(field));
+			}
+
+			output.collect(newKey, newRecord);
+		}
+		catch (IOException e)
+		{
+			log.warn("Unable to collect output in JobLogHistoryReduceProcessor [" + key + "]", e);
+			e.printStackTrace();
+		}
+
+	}
+
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/ReduceProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/ReduceProcessor.java?rev=723855&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/ReduceProcessor.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/ReduceProcessor.java Fri Dec  5 12:30:14 2008
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.extraction.demux.processor.reducer;
+
+import java.util.Iterator;
+
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+
+public interface ReduceProcessor
+{
+	public String getDataType();
+	public void process(ChukwaRecordKey key,Iterator<ChukwaRecord> values,
+						OutputCollector<ChukwaRecordKey, 
+						ChukwaRecord> output, Reporter reporter);
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/ReduceProcessorFactory.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/ReduceProcessorFactory.java?rev=723855&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/ReduceProcessorFactory.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/ReduceProcessorFactory.java Fri Dec  5 12:30:14 2008
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.extraction.demux.processor.reducer;
+
+import java.util.HashMap;
+
+import org.apache.log4j.Logger;
+
+
+
+public class ReduceProcessorFactory
+{
+	static Logger log = Logger.getLogger(ReduceProcessorFactory.class);
+	
+	// TODO
+	//	add new mapper package at the end.
+	//	We should have a more generic way to do this.
+	//	Ex: read from config
+	//	list of alias
+	//	and
+	//	alias -> processor class
+	
+	// ******** WARNING ********
+	// If the ReduceProcessor is not there use Identity instead
+	
+	
+	private static HashMap<String,ReduceProcessor > processors =
+	    new HashMap<String, ReduceProcessor>(); // registry
+		
+	private ReduceProcessorFactory()
+	{}
+	
+	public static ReduceProcessor getProcessor(String reduceType)
+	 throws UnknownReduceTypeException
+	{
+		String path = "org.apache.hadoop.chukwa.extraction.demux.processor.reducer."+reduceType;
+		if (processors.containsKey(reduceType)) {
+			return processors.get(reduceType);
+		} else {
+			ReduceProcessor processor = null;
+			try {
+				processor = (ReduceProcessor)Class.forName(path).getConstructor().newInstance();
+			} 
+			catch(ClassNotFoundException e) 
+			{
+				// ******** WARNING ********
+				// If the ReduceProcessor is not there use Identity instead
+				processor = getProcessor("IdentityReducer");
+				register(reduceType,processor);
+				return processor;
+			} 
+			catch(Exception e) {
+			  throw new UnknownReduceTypeException("error constructing processor", e);
+			}
+			
+			//TODO using a ThreadSafe/reuse flag to actually decide if we want 
+			// to reuse the same processor again and again
+			register(reduceType,processor);
+			return processor;
+		}
+	}
+	
+	  /** Register a specific parser for a {@link ReduceProcessor}
+	   * implementation. */
+	  public static synchronized void register(String reduceType,
+	                                         ReduceProcessor processor) 
+	  {
+		  log.info("register " + processor.getClass().getName() + " for this recordType :" + reduceType);
+		  if (processors.containsKey(reduceType))
+			{
+			  throw new DuplicateReduceProcessorException("Duplicate processor for recordType:" + reduceType);
+			}
+		  ReduceProcessorFactory.processors.put(reduceType, processor);
+	  }
+
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/SystemMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/SystemMetrics.java?rev=723855&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/SystemMetrics.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/SystemMetrics.java Fri Dec  5 12:30:14 2008
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.extraction.demux.processor.reducer;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.log4j.Logger;
+
+public class SystemMetrics  implements ReduceProcessor {
+	static Logger log = Logger.getLogger(SystemMetrics.class);
+	@Override
+	public String getDataType() {
+		return this.getClass().getName();
+	}
+
+	@Override
+	public void process(ChukwaRecordKey key, 
+						Iterator<ChukwaRecord> values,
+						OutputCollector<ChukwaRecordKey, ChukwaRecord> output,
+						Reporter reporter) {
+		try {
+			
+			ChukwaRecord record = null;
+			ChukwaRecord newRecord = new ChukwaRecord();
+      
+			while(values.hasNext()) {
+				record = values.next();
+				newRecord.setTime(record.getTime());
+
+				if(record.containsField("IFACE")) {
+					if(record.containsField("rxpck/s")) {
+						if (record.containsField("rxbyt/s") && record.containsField("txbyt/s")) {
+							double netBusyPcnt=0, netRxByts=0, netTxByts=0, netSpeed=128000000.00;
+							netRxByts=Double.parseDouble(record.getValue("rxbyt/s"));
+							netTxByts=Double.parseDouble(record.getValue("txbyt/s"));
+							netBusyPcnt=(netRxByts/netSpeed*100)+(netTxByts/netSpeed*100);
+							record.add(record.getValue("IFACE")+"_busy_pcnt", "" + netBusyPcnt);
+							record.add("csource", record.getValue("csource"));
+						}						
+						record.add(record.getValue("IFACE")+".rxbyt/s", record.getValue("rxbyt/s"));
+						record.add(record.getValue("IFACE")+".rxpck/s", record.getValue("rxpck/s"));
+						record.add(record.getValue("IFACE")+".txbyt/s", record.getValue("txbyt/s"));
+						record.add(record.getValue("IFACE")+".txpck/s", record.getValue("txpck/s"));
+						record.removeValue("rxbyt/s");
+						record.removeValue("rxpck/s");
+						record.removeValue("txbyt/s");
+						record.removeValue("txpck/s");
+					}
+					if(record.containsField("rxerr/s")) {
+						record.add(record.getValue("IFACE")+".rxerr/s", record.getValue("rxerr/s"));
+						record.add(record.getValue("IFACE")+".rxdrop/s", record.getValue("rxdrop/s"));
+						record.add(record.getValue("IFACE")+".txerr/s", record.getValue("txerr/s"));
+						record.add(record.getValue("IFACE")+".txdrop/s", record.getValue("txdrop/s"));
+						record.removeValue("rxerr/s");						
+						record.removeValue("rxdrop/s");
+						record.removeValue("txerr/s");
+						record.removeValue("txdrop/s");
+					}
+					record.removeValue("IFACE");
+				}
+				
+				if(record.containsField("Device:")) {
+					record.add(record.getValue("Device:")+".rkB/s", record.getValue("rkB/s"));
+					record.add(record.getValue("Device:")+".wkB/s", record.getValue("wkB/s"));
+					record.add(record.getValue("Device:")+".%util", record.getValue("%util"));
+					record.removeValue("rkB/s");
+					record.removeValue("wkB/s");
+					record.removeValue("%util");
+					record.removeValue("Device:");
+				}
+				
+				if (record.containsField("swap_free")) {
+					float swapUsedPcnt=0, swapUsed=0, swapTotal=0;
+					swapUsed=Long.parseLong(record.getValue("swap_used"));
+					swapTotal=Long.parseLong(record.getValue("swap_total"));
+					swapUsedPcnt=swapUsed/swapTotal*100;
+					record.add("swap_used_pcnt", "" + swapUsedPcnt);
+					record.add("csource", record.getValue("csource"));
+				}
+				
+				if (record.containsField("mem_used")) {
+					double memUsedPcnt=0, memTotal=0, memUsed=0;
+					memTotal=Double.parseDouble(record.getValue("mem_total"));
+					memUsed=Double.parseDouble(record.getValue("mem_used"));
+					memUsedPcnt=memUsed/memTotal*100;
+					record.add("mem_used_pcnt", "" + memUsedPcnt);
+					record.add("csource", record.getValue("csource"));
+				}
+				
+				if (record.containsField("mem_buffers")) {
+					double memBuffersPcnt=0, memTotal=0, memBuffers=0;
+					memTotal=Double.parseDouble(record.getValue("mem_total"));
+					memBuffers=Double.parseDouble(record.getValue("mem_buffers"));
+					memBuffersPcnt=memBuffers/memTotal*100;
+					record.add("mem_buffers_pcnt", "" + memBuffersPcnt);
+					record.add("csource", record.getValue("csource"));
+				}
+						
+				// Copy over all fields
+				String[] fields = record.getFields();
+				for(String f: fields){
+				  newRecord.add(f, record.getValue(f));
+				}
+			}
+			record.add("capp", "systemMetrics");
+			output.collect(key, newRecord);   
+		} catch (IOException e) {
+			log.warn("Unable to collect output in SystemMetricsReduceProcessor [" + key + "]", e);
+			e.printStackTrace();
+		}
+
+	}
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/UnknownReduceTypeException.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/UnknownReduceTypeException.java?rev=723855&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/UnknownReduceTypeException.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/UnknownReduceTypeException.java Fri Dec  5 12:30:14 2008
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.extraction.demux.processor.reducer;
+
+public class UnknownReduceTypeException extends Exception
+{
+
+	/**
+	 * 
+	 */
+	private static final long serialVersionUID = 5760553864088487836L;
+
+	public UnknownReduceTypeException()
+	{
+	}
+
+	public UnknownReduceTypeException(String message)
+	{
+		super(message);
+	}
+
+	public UnknownReduceTypeException(Throwable cause)
+	{
+		super(cause);
+	}
+
+	public UnknownReduceTypeException(String message, Throwable cause)
+	{
+		super(message, cause);
+	}
+
+}

Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaRecord.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaRecord.java?rev=723855&r1=723854&r2=723855&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaRecord.java (original)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaRecord.java Fri Dec  5 12:30:14 2008
@@ -67,6 +67,12 @@
 		return this.mapFields.containsKey(field);
 	}
 
+	public void removeValue(String field) {
+		if(this.mapFields.containsKey(field)) {
+			this.mapFields.remove(field);
+		}
+	}
+	
 	@Override
 	public String toString()
 	{
@@ -76,36 +82,37 @@
 		Map.Entry<String,Buffer> entry = null;
 		StringBuilder sb = new StringBuilder();
 		sb.append("<event  ");
-		String body = null;
+		StringBuilder body = new StringBuilder();
+		
 		String key = null;
 		String val = null;
-		
+		boolean hasBody = false;
+		String bodyVal = null;
 		while (it.hasNext())
 		{
 			entry = it.next();
 			key = entry.getKey().intern();
 			val = new String(entry.getValue().get());
-			if (key == Record.rawField.intern())
+			if (key.intern() == Record.bodyField.intern())
 			{
-				continue;
-			}
-			
-			if (key == Record.bodyField.intern())
-			{
-				body = val;
+				hasBody = true;
+				bodyVal = val;
 			}
 			else
 			{
-				sb.append(entry.getKey()).append("=\"").append(val).append("\" ");
+				sb.append(key).append("=\"").append(val).append("\" ");
+				body.append(key).append( " = ").append(val).append("<br>");
 			}
+			
+			
 		}
-		sb.append(">").append(body);
+		if (hasBody)	
+		{ sb.append(">").append(bodyVal);}
+		else
+		{ sb.append(">").append(body);}
 		sb.append("</event>");
 		
 		return sb.toString();
-//		//<event start="Jun 15 2008 00:00:00" end="Jun 15 2008 12:00:00" title="hello" link="/here">body</event>
-//		return 	"<event start=\"" + formatter.format(new Date(this.getTime())) + "\" title=\""
-//		+  this.getValue(Record.sourceField)   + "\" >" + this.getValue(Record.bodyField) + "</event>" ;
 	}
 
 	

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaRecordKey.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaRecordKey.java?rev=723855&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaRecordKey.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaRecordKey.java Fri Dec  5 12:30:14 2008
@@ -0,0 +1,212 @@
+// File generated by hadoop record compiler. Do not edit.
+package org.apache.hadoop.chukwa.extraction.engine;
+
+public class ChukwaRecordKey extends org.apache.hadoop.record.Record {
+  private static final org.apache.hadoop.record.meta.RecordTypeInfo _rio_recTypeInfo;
+  private static org.apache.hadoop.record.meta.RecordTypeInfo _rio_rtiFilter;
+  private static int[] _rio_rtiFilterFields;
+  static {
+    _rio_recTypeInfo = new org.apache.hadoop.record.meta.RecordTypeInfo("ChukwaRecordKey");
+    _rio_recTypeInfo.addField("reduceType", org.apache.hadoop.record.meta.TypeID.StringTypeID);
+    _rio_recTypeInfo.addField("key", org.apache.hadoop.record.meta.TypeID.StringTypeID);
+  }
+  
+  private String reduceType;
+  private String key;
+  public ChukwaRecordKey() { }
+  public ChukwaRecordKey(
+    final String reduceType,
+    final String key) {
+    this.reduceType = reduceType;
+    this.key = key;
+  }
+  public static org.apache.hadoop.record.meta.RecordTypeInfo getTypeInfo() {
+    return _rio_recTypeInfo;
+  }
+  public static void setTypeFilter(org.apache.hadoop.record.meta.RecordTypeInfo rti) {
+    if (null == rti) return;
+    _rio_rtiFilter = rti;
+    _rio_rtiFilterFields = null;
+  }
+  private static void setupRtiFields()
+  {
+    if (null == _rio_rtiFilter) return;
+    // we may already have done this
+    if (null != _rio_rtiFilterFields) return;
+    int _rio_i, _rio_j;
+    _rio_rtiFilterFields = new int [_rio_rtiFilter.getFieldTypeInfos().size()];
+    for (_rio_i=0; _rio_i<_rio_rtiFilterFields.length; _rio_i++) {
+      _rio_rtiFilterFields[_rio_i] = 0;
+    }
+    java.util.Iterator<org.apache.hadoop.record.meta.FieldTypeInfo> _rio_itFilter = _rio_rtiFilter.getFieldTypeInfos().iterator();
+    _rio_i=0;
+    while (_rio_itFilter.hasNext()) {
+      org.apache.hadoop.record.meta.FieldTypeInfo _rio_tInfoFilter = _rio_itFilter.next();
+      java.util.Iterator<org.apache.hadoop.record.meta.FieldTypeInfo> _rio_it = _rio_recTypeInfo.getFieldTypeInfos().iterator();
+      _rio_j=1;
+      while (_rio_it.hasNext()) {
+        org.apache.hadoop.record.meta.FieldTypeInfo _rio_tInfo = _rio_it.next();
+        if (_rio_tInfo.equals(_rio_tInfoFilter)) {
+          _rio_rtiFilterFields[_rio_i] = _rio_j;
+          break;
+        }
+        _rio_j++;
+      }
+      _rio_i++;
+    }
+  }
+  public String getReduceType() {
+    return reduceType;
+  }
+  public void setReduceType(final String reduceType) {
+    this.reduceType=reduceType;
+  }
+  public String getKey() {
+    return key;
+  }
+  public void setKey(final String key) {
+    this.key=key;
+  }
+  public void serialize(final org.apache.hadoop.record.RecordOutput _rio_a, final String _rio_tag)
+  throws java.io.IOException {
+    _rio_a.startRecord(this,_rio_tag);
+    _rio_a.writeString(reduceType,"reduceType");
+    _rio_a.writeString(key,"key");
+    _rio_a.endRecord(this,_rio_tag);
+  }
+  private void deserializeWithoutFilter(final org.apache.hadoop.record.RecordInput _rio_a, final String _rio_tag)
+  throws java.io.IOException {
+    _rio_a.startRecord(_rio_tag);
+    reduceType=_rio_a.readString("reduceType");
+    key=_rio_a.readString("key");
+    _rio_a.endRecord(_rio_tag);
+  }
+  public void deserialize(final org.apache.hadoop.record.RecordInput _rio_a, final String _rio_tag)
+  throws java.io.IOException {
+    if (null == _rio_rtiFilter) {
+      deserializeWithoutFilter(_rio_a, _rio_tag);
+      return;
+    }
+    // if we're here, we need to read based on version info
+    _rio_a.startRecord(_rio_tag);
+    setupRtiFields();
+    for (int _rio_i=0; _rio_i<_rio_rtiFilter.getFieldTypeInfos().size(); _rio_i++) {
+      if (1 == _rio_rtiFilterFields[_rio_i]) {
+        reduceType=_rio_a.readString("reduceType");
+      }
+      else if (2 == _rio_rtiFilterFields[_rio_i]) {
+        key=_rio_a.readString("key");
+      }
+      else {
+        java.util.ArrayList<org.apache.hadoop.record.meta.FieldTypeInfo> typeInfos = (java.util.ArrayList<org.apache.hadoop.record.meta.FieldTypeInfo>)(_rio_rtiFilter.getFieldTypeInfos());
+        org.apache.hadoop.record.meta.Utils.skip(_rio_a, typeInfos.get(_rio_i).getFieldID(), typeInfos.get(_rio_i).getTypeID());
+      }
+    }
+    _rio_a.endRecord(_rio_tag);
+  }
+  public int compareTo (final Object _rio_peer_) throws ClassCastException {
+    if (!(_rio_peer_ instanceof ChukwaRecordKey)) {
+      throw new ClassCastException("Comparing different types of records.");
+    }
+    ChukwaRecordKey _rio_peer = (ChukwaRecordKey) _rio_peer_;
+    int _rio_ret = 0;
+    _rio_ret = reduceType.compareTo(_rio_peer.reduceType);
+    if (_rio_ret != 0) return _rio_ret;
+    _rio_ret = key.compareTo(_rio_peer.key);
+    if (_rio_ret != 0) return _rio_ret;
+    return _rio_ret;
+  }
+  public boolean equals(final Object _rio_peer_) {
+    if (!(_rio_peer_ instanceof ChukwaRecordKey)) {
+      return false;
+    }
+    if (_rio_peer_ == this) {
+      return true;
+    }
+    ChukwaRecordKey _rio_peer = (ChukwaRecordKey) _rio_peer_;
+    boolean _rio_ret = false;
+    _rio_ret = reduceType.equals(_rio_peer.reduceType);
+    if (!_rio_ret) return _rio_ret;
+    _rio_ret = key.equals(_rio_peer.key);
+    if (!_rio_ret) return _rio_ret;
+    return _rio_ret;
+  }
+  public Object clone() throws CloneNotSupportedException {
+    ChukwaRecordKey _rio_other = new ChukwaRecordKey();
+    _rio_other.reduceType = this.reduceType;
+    _rio_other.key = this.key;
+    return _rio_other;
+  }
+  public int hashCode() {
+    int _rio_result = 17;
+    int _rio_ret;
+    _rio_ret = reduceType.hashCode();
+    _rio_result = 37*_rio_result + _rio_ret;
+    _rio_ret = key.hashCode();
+    _rio_result = 37*_rio_result + _rio_ret;
+    return _rio_result;
+  }
+  public static String signature() {
+    return "LChukwaRecordKey(ss)";
+  }
+  public static class Comparator extends org.apache.hadoop.record.RecordComparator {
+    public Comparator() {
+      super(ChukwaRecordKey.class);
+    }
+    static public int slurpRaw(byte[] b, int s, int l) {
+      try {
+        int os = s;
+        {
+          int i = org.apache.hadoop.record.Utils.readVInt(b, s);
+          int z = org.apache.hadoop.record.Utils.getVIntSize(i);
+          s+=(z+i); l-= (z+i);
+        }
+        {
+          int i = org.apache.hadoop.record.Utils.readVInt(b, s);
+          int z = org.apache.hadoop.record.Utils.getVIntSize(i);
+          s+=(z+i); l-= (z+i);
+        }
+        return (os - s);
+      } catch(java.io.IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
+    static public int compareRaw(byte[] b1, int s1, int l1,
+                                   byte[] b2, int s2, int l2) {
+      try {
+        int os1 = s1;
+        {
+          int i1 = org.apache.hadoop.record.Utils.readVInt(b1, s1);
+          int i2 = org.apache.hadoop.record.Utils.readVInt(b2, s2);
+          int z1 = org.apache.hadoop.record.Utils.getVIntSize(i1);
+          int z2 = org.apache.hadoop.record.Utils.getVIntSize(i2);
+          s1+=z1; s2+=z2; l1-=z1; l2-=z2;
+          int r1 = org.apache.hadoop.record.Utils.compareBytes(b1,s1,i1,b2,s2,i2);
+          if (r1 != 0) { return (r1<0)?-1:0; }
+          s1+=i1; s2+=i2; l1-=i1; l1-=i2;
+        }
+        {
+          int i1 = org.apache.hadoop.record.Utils.readVInt(b1, s1);
+          int i2 = org.apache.hadoop.record.Utils.readVInt(b2, s2);
+          int z1 = org.apache.hadoop.record.Utils.getVIntSize(i1);
+          int z2 = org.apache.hadoop.record.Utils.getVIntSize(i2);
+          s1+=z1; s2+=z2; l1-=z1; l2-=z2;
+          int r1 = org.apache.hadoop.record.Utils.compareBytes(b1,s1,i1,b2,s2,i2);
+          if (r1 != 0) { return (r1<0)?-1:0; }
+          s1+=i1; s2+=i2; l1-=i1; l1-=i2;
+        }
+        return (os1 - s1);
+      } catch(java.io.IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
+    public int compare(byte[] b1, int s1, int l1,
+                         byte[] b2, int s2, int l2) {
+      int ret = compareRaw(b1,s1,l1,b2,s2,l2);
+      return (ret == -1)? -1 : ((ret==0)? 1 : 0);}
+  }
+  
+  static {
+    org.apache.hadoop.record.RecordComparator.define(ChukwaRecordKey.class, new Comparator());
+  }
+}

Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaSearchResult.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaSearchResult.java?rev=723855&r1=723854&r2=723855&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaSearchResult.java (original)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaSearchResult.java Fri Dec  5 12:30:14 2008
@@ -22,10 +22,12 @@
 import java.util.TreeMap;
 
 
+
 public class ChukwaSearchResult implements SearchResult
 {
 	private TreeMap<Long, List<Record>> records;
-
+	private Token token = null;
+	
 	public TreeMap<Long, List<Record>> getRecords()
 	{
 		return records;
@@ -35,5 +37,15 @@
 	{
 		this.records = records;
 	}
+
+	public Token getToken()
+	{
+		return token;
+	}
+
+	public void setToken(Token token)
+	{
+		this.token = token;
+	}
 	
 }

Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaSearchService.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaSearchService.java?rev=723855&r1=723854&r2=723855&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaSearchService.java (original)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaSearchService.java Fri Dec  5 12:30:14 2008
@@ -29,7 +29,7 @@
 {
 	private DataSourceFactory dataSourceFactory = DataSourceFactory.getInstance();
 	
-	public SearchResult  search(String cluster,String[] dataSources,long t0,long t1,String filter)
+	public SearchResult  search(String cluster,String[] dataSources,long t0,long t1,String filter,Token token)
 	throws DataSourceException
 	{
 		SearchResult result = new ChukwaSearchResult();
@@ -40,7 +40,7 @@
 		for(int i=0;i<dataSources.length;i++)
 		{
 			DataSource ds = dataSourceFactory.getDataSource(dataSources[i]);
-			ds.search(result, cluster, dataSources[i], t0, t1, filter);
+			ds.search(result, cluster, dataSources[i], t0, t1, filter,token);
 		}
 		return result;
 	}

Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/Record.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/Record.java?rev=723855&r1=723854&r2=723855&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/Record.java (original)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/Record.java Fri Dec  5 12:30:14 2008
@@ -21,17 +21,21 @@
 public interface Record
 {
 	public static final String bodyField = "body";
-	
+	public static final String sourceField = "csource";
+	public static final String applicationField = "capp";
+	public static final String tagsField = "ctags";
+  public static final String chunkDataField = "cchunkData";
+  public static final String chunkExceptionField = "cchunkException";
+  
+	public static final String classField = "class";
 	public static final String logLevelField = "logLevel";
-	public static final String destinationField = "dest";
-	public static final String dataSourceField = "ds";
-	public static final String sourceField = "src";
-	public static final String streamNameField = "sname";
-	public static final String typeField = "type";
-	public static final String classField = "pkg";
-	public static final String rawField = "raw";
 	
-	public static final String fieldSeparator = ":";
+//	public static final String streamNameField = "sname";
+//	public static final String typeField = "type";
+	
+//	public static final String rawField = "raw";
+	
+//	public static final String fieldSeparator = ":";
 	
 	public long getTime();
 	public void add(String key, String value);

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/RecordUtil.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/RecordUtil.java?rev=723855&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/RecordUtil.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/RecordUtil.java Fri Dec  5 12:30:14 2008
@@ -0,0 +1,23 @@
+package org.apache.hadoop.chukwa.extraction.engine;
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class RecordUtil
+{
+	static  Pattern clusterPattern = Pattern.compile("(.*)?cluster=\"(.*?)\"(.*)?");
+	public static String getClusterName(Record record)
+	{
+		String tags = record.getValue(Record.tagsField);
+		if (tags!= null)
+		{
+			Matcher matcher = clusterPattern.matcher(tags);
+			if (matcher.matches())
+			{
+				return matcher.group(2);
+			}
+		}
+			
+		return "undefined";
+	}
+}

Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/SearchResult.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/SearchResult.java?rev=723855&r1=723854&r2=723855&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/SearchResult.java (original)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/SearchResult.java Fri Dec  5 12:30:14 2008
@@ -23,8 +23,11 @@
 
 
 
+
 public interface SearchResult
 {
+	public void setToken(Token token);
+	public Token getToken();
 	public TreeMap<Long, List<Record>> getRecords();
 	public void setRecords(TreeMap<Long, List<Record>> records);
 }

Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/SearchService.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/SearchService.java?rev=723855&r1=723854&r2=723855&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/SearchService.java (original)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/SearchService.java Fri Dec  5 12:30:14 2008
@@ -23,6 +23,6 @@
 
 public interface SearchService
 {
-	public SearchResult search(String cluster,String[] dataSources,long t0,long t1,String filter)
+	public SearchResult search(String cluster,String[] dataSources,long t0,long t1,String filter,Token token)
 	throws DataSourceException;
 }

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/Token.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/Token.java?rev=723855&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/Token.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/Token.java Fri Dec  5 12:30:14 2008
@@ -0,0 +1,7 @@
+package org.apache.hadoop.chukwa.extraction.engine;
+
+public class Token
+{
+	public String key = null;
+	public boolean hasMore = false;
+}

Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/datasource/DataSource.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/datasource/DataSource.java?rev=723855&r1=723854&r2=723855&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/datasource/DataSource.java (original)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/datasource/DataSource.java Fri Dec  5 12:30:14 2008
@@ -19,6 +19,7 @@
 package org.apache.hadoop.chukwa.extraction.engine.datasource;
 
 import org.apache.hadoop.chukwa.extraction.engine.SearchResult;
+import org.apache.hadoop.chukwa.extraction.engine.Token;
 
 
 
@@ -28,7 +29,8 @@
 	public SearchResult
 		search(	SearchResult result,String cluster,String dataSource,
 				long t0,long t1,
-				String filter)
+				String filter,
+				Token token)
 		throws DataSourceException;
 	public boolean isThreadSafe();
 	

Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/datasource/DataSourceFactory.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/datasource/DataSourceFactory.java?rev=723855&r1=723854&r2=723855&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/datasource/DataSourceFactory.java (original)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/datasource/DataSourceFactory.java Fri Dec  5 12:30:14 2008
@@ -21,7 +21,7 @@
 import java.util.HashMap;
 
 import org.apache.hadoop.chukwa.extraction.engine.datasource.database.DatabaseDS;
-import org.apache.hadoop.chukwa.extraction.engine.datasource.record.RecordDS;
+import org.apache.hadoop.chukwa.extraction.engine.datasource.record.ChukwaRecordDataSource;
 
 public class DataSourceFactory
 {
@@ -37,10 +37,7 @@
 		dataSources.put("MRJob", databaseDS);
 		dataSources.put("HodJob", databaseDS);
 		dataSources.put("QueueInfo", databaseDS);
-		
-		DataSource recordDS = new RecordDS();
-		dataSources.put("NameNode", recordDS);
-		dataSources.put("ChukwaLocalAgent", recordDS);
+	
 	}
 	
 	public static DataSourceFactory getInstance()
@@ -64,7 +61,7 @@
 		}
 		else
 		{
-			DataSource hsdfsDS = new RecordDS();
+			DataSource hsdfsDS = new ChukwaRecordDataSource();
 			dataSources.put(datasourceName, hsdfsDS);
 			return hsdfsDS;
 			//TODO proto only!

Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/datasource/DsDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/datasource/DsDirectory.java?rev=723855&r1=723854&r2=723855&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/datasource/DsDirectory.java (original)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/datasource/DsDirectory.java Fri Dec  5 12:30:14 2008
@@ -22,6 +22,7 @@
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
 import org.apache.hadoop.chukwa.inputtools.mdl.DataConfig;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
@@ -44,15 +45,20 @@
 	private DsDirectory()
 	{
 		dataConfig = new DataConfig();
-		conf = new Configuration();
+		conf = new ChukwaConfiguration();
 		try
 		{
 			fs = FileSystem.get(conf);
-		} catch (IOException e)
+		} 
+		catch (IOException e)
 		{
 			e.printStackTrace();
 		}
 		rootFolder = dataConfig.get("chukwa.engine.dsDirectory.rootFolder");
+		if (!rootFolder.endsWith("/"))
+		{
+			rootFolder +="/";
+		}
 	}
 	
 	public static DsDirectory getInstance()
@@ -94,7 +100,7 @@
 	public static void main(String[] args) throws DataSourceException
 	{
 		DsDirectory dsd = DsDirectory.getInstance();
-		String[] dss = dsd.list("localhost");
+		String[] dss = dsd.list("unknown");
 		for (String d : dss)
 		{
 			System.out.println(d);

Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/datasource/database/DatabaseDS.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/datasource/database/DatabaseDS.java?rev=723855&r1=723854&r2=723855&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/datasource/database/DatabaseDS.java (original)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/datasource/database/DatabaseDS.java Fri Dec  5 12:30:14 2008
@@ -34,15 +34,16 @@
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
 import org.apache.hadoop.chukwa.extraction.engine.Record;
 import org.apache.hadoop.chukwa.extraction.engine.SearchResult;
+import org.apache.hadoop.chukwa.extraction.engine.Token;
 import org.apache.hadoop.chukwa.extraction.engine.datasource.DataSource;
 import org.apache.hadoop.chukwa.extraction.engine.datasource.DataSourceException;
-import org.apache.hadoop.chukwa.hicc.ClusterConfig;
+//import org.apache.hadoop.chukwa.hicc.ClusterConfig;
 
 public class DatabaseDS implements DataSource
 {
 		
 	public SearchResult search(SearchResult result, String cluster,
-			String dataSource, long t0, long t1, String filter)
+			String dataSource, long t0, long t1, String filter,Token token)
 			throws DataSourceException
 	{
 		SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd kk:mm:ss");
@@ -79,8 +80,8 @@
 	    	String dateclause = timeField + " >= '" + startS 
 	    		+ "' and " + timeField + " <= '" + endS + "'";
 	    	
-		       ClusterConfig cc = new ClusterConfig();
-		       String jdbc = cc.getURL(cluster);
+		       //ClusterConfig cc = new ClusterConfig();
+		       String jdbc = ""; //cc.getURL(cluster);
 		       
 			   Connection conn = DriverManager.getConnection(jdbc);
 			   

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/datasource/record/ChukwaDSInternalResult.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/datasource/record/ChukwaDSInternalResult.java?rev=723855&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/datasource/record/ChukwaDSInternalResult.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/datasource/record/ChukwaDSInternalResult.java Fri Dec  5 12:30:14 2008
@@ -0,0 +1,21 @@
+package org.apache.hadoop.chukwa.extraction.engine.datasource.record;
+
+import java.util.List;
+
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
+import org.apache.hadoop.chukwa.extraction.engine.Record;
+
+public class ChukwaDSInternalResult
+{
+	List<Record> records = null;
+	String day = null;
+	int hour = 0;
+	int rawIndex = 0;
+	int spill = 1;
+	long position = -1;
+	long currentTs = -1;
+	
+	String fileName = null;
+	
+	ChukwaRecordKey key = null;
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/datasource/record/ChukwaRecordDataSource.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/datasource/record/ChukwaRecordDataSource.java?rev=723855&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/datasource/record/ChukwaRecordDataSource.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/datasource/record/ChukwaRecordDataSource.java Fri Dec  5 12:30:14 2008
@@ -0,0 +1,520 @@
+package org.apache.hadoop.chukwa.extraction.engine.datasource.record;
+
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.TreeMap;
+
+import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaSearchResult;
+import org.apache.hadoop.chukwa.extraction.engine.Record;
+import org.apache.hadoop.chukwa.extraction.engine.SearchResult;
+import org.apache.hadoop.chukwa.extraction.engine.Token;
+import org.apache.hadoop.chukwa.extraction.engine.datasource.DataSource;
+import org.apache.hadoop.chukwa.extraction.engine.datasource.DataSourceException;
+import org.apache.hadoop.chukwa.inputtools.mdl.DataConfig;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.log4j.Logger;
+
+public class ChukwaRecordDataSource implements DataSource
+{
+	//TODO need some cleanup after 1st production
+	// First implementation to get it working with the new directory structure
+	
+	static Logger log = Logger.getLogger(ChukwaRecordDataSource.class);
+	
+	private static final int dayFolder = 100;
+	private static final int hourFolder = 200;
+	private static final int rawFolder = 300;
+	
+	static final String[] raws = {"0","5","10","15","20","25","30","35","40","45","50","55"};
+	
+	private static FileSystem fs = null;
+	private static ChukwaConfiguration conf = null;
+	
+	private static String rootDsFolder = null;
+	private static DataConfig dataConfig = null;
+	
+	static
+	{
+		dataConfig = new DataConfig();
+		rootDsFolder = dataConfig.get("chukwa.engine.dsDirectory.rootFolder");
+		conf = new ChukwaConfiguration();
+		try
+		{
+			fs = FileSystem.get(conf);
+		} catch (IOException e)
+		{
+			e.printStackTrace();
+		}
+	}
+	
+	@Override
+	public boolean isThreadSafe()
+	{
+		return true;
+	}
+
+
+	@Override
+	public SearchResult search(SearchResult result, String cluster,
+			String dataSource, long t0, long t1, String filter,Token token)
+			throws DataSourceException
+	{
+		String filePath = rootDsFolder + "/" + cluster + "/";
+
+		log.debug("filePath [" + filePath + "]");	
+		Calendar calendar = Calendar.getInstance();
+		calendar.setTimeInMillis(t0);
+		SimpleDateFormat sdf = new java.text.SimpleDateFormat("yyyyMMdd");
+		int maxCount = 200;
+		
+		List<Record> records = new ArrayList<Record>();
+
+		ChukwaDSInternalResult res =  new ChukwaDSInternalResult();
+		
+		if (token != null)
+		{
+			// token.key = day + "|" + hour + "|" + raw + "|" + spill + "|" + res.currentTs + "|"+ res.position + "|"+ res.fileName;
+			try
+			{
+				String[] vars = token.key.split("\\|");
+				res.day = vars[0];
+				res.hour = Integer.parseInt(vars[1]);
+				res.rawIndex = Integer.parseInt(vars[2]);
+				res.spill = Integer.parseInt(vars[3]);
+				res.currentTs = Long.parseLong(vars[4]);
+				res.position = Long.parseLong(vars[5]);
+				res.fileName = vars[5];
+				log.info("Token is not null! :" + token.key);
+			}
+			catch(Exception e)
+			{
+				log.error("Incalid Key: [" + token.key + "] exception: ", e);
+			}
+		}
+		else
+		{
+			log.debug("Token is  null!" );
+		}
+		
+		try
+		{
+			do
+			{
+				log.debug("start Date [" + calendar.getTime() + "]");
+				String workingDay = sdf.format(calendar.getTime());
+				int workingHour = calendar.get(Calendar.HOUR_OF_DAY);
+				int startRawIndex = 0;
+				if (token !=null)
+				{
+					workingDay = res.day ;
+					workingHour = res.hour;
+					startRawIndex = res.rawIndex;
+				}
+				else
+				{
+					token = new Token();
+				}
+				
+				log.debug("workingDay " + workingDay);
+				log.debug("workingHour " + workingHour);
+				
+				if (exist(dayFolder,filePath,dataSource,workingDay,null,null))
+				{
+					// Extract Data for Day
+					if (containsRotateFlag(dayFolder,filePath,dataSource,workingDay,null))
+					{
+						// read data from day
+						// SystemMetrics/20080922/SystemMetrics_20080922.1.evt
+						log.debug("fs.exists(workingDayRotatePath) ");
+						extractRecords(res,ChukwaRecordDataSource.dayFolder,filePath,dataSource,workingDay, null, -1,
+								 token, records, maxCount, t0, t1, filter);
+						maxCount = maxCount - records.size();
+						if ( (maxCount <= 0) || (res.currentTs > t1))
+						 { break; }
+						
+					} // End process Day File
+					else // check for hours
+					{
+						log.debug("check for hours");
+						for (int hour = 0; hour<24;hour ++)
+						{
+							if ( workingDay == res.day && hour<workingHour)
+							{
+								continue;
+							}
+							log.debug(" Hour?  -->" + filePath + dataSource + "/"+ workingDay+ "/" + hour);
+							if (exist(dayFolder,filePath,dataSource,workingDay,""+hour,null))
+							{
+								if (containsRotateFlag(dayFolder,filePath,dataSource,workingDay,""+hour))
+								{
+									// read data from Hour
+									// SystemMetrics/20080922/12/SystemMetrics_20080922_12.1.evt
+									extractRecords(res,ChukwaRecordDataSource.hourFolder,filePath,dataSource,workingDay, ""+hour, -1,
+											 token, records, maxCount, t0, t1, filter);
+								}
+								else // check for raw
+								{
+									log.debug("Working on Raw");
+									
+									for(int rawIndex=startRawIndex;rawIndex<12;rawIndex++)
+									{
+										// read data from Raw
+										// SystemMetrics/20080922/0/25/SystemMetrics_20080922_0_25.1.evt
+										if (exist(dayFolder,filePath,dataSource,workingDay,""+hour,raws[rawIndex]))
+										{
+											extractRecords(res,ChukwaRecordDataSource.rawFolder,filePath,dataSource,workingDay, ""+hour, rawIndex,
+													 token, records, maxCount, t0, t1, filter);
+											maxCount = maxCount - records.size();
+											if ( (maxCount <= 0) || (res.currentTs > t1))
+											 { break; }
+										}
+										else
+										{
+											log.debug("<<<<<<<<<Working on Raw Not exist--> "
+													+ filePath + dataSource + "/" + workingDay+ "/" + workingHour + "/" + raws[rawIndex] );
+										}
+										res.spill = 1;
+									}
+								}
+							} // End if (fs.exists(new Path(filePath + workingDay+ "/" + hour)))
+							 
+							maxCount = maxCount - records.size();
+							if ( (maxCount <= 0) || (res.currentTs > t1))
+							 { break; }
+
+						} // End process all Hourly/raw files
+					}
+				}	
+				
+				maxCount = maxCount - records.size();
+				if ( (maxCount <= 0) || (res.currentTs > t1))
+				 { break; }
+			
+				// move to the next day
+				calendar.add(Calendar.DAY_OF_MONTH, +1);
+				calendar.set(Calendar.HOUR_OF_DAY, 0);
+				calendar.set(Calendar.MINUTE, 0);
+				calendar.set(Calendar.SECOND, 0);
+				
+			}
+			while (calendar.getTimeInMillis() < t1); 
+			
+		}
+		catch(Exception e)
+		{
+			e.printStackTrace();
+			throw new DataSourceException(e);
+		}
+	
+		TreeMap<Long, List<Record>> recordsInResult = result.getRecords();
+		for (Record record : records)
+		{
+			long timestamp = record.getTime();
+			if (recordsInResult.containsKey(timestamp))
+			   {
+				recordsInResult.get(timestamp).add(record);
+			   }
+			   else
+			   {
+				   List<Record> list = new LinkedList<Record>();
+				   list.add(record);
+				   recordsInResult.put(timestamp, list);
+			   }   
+		}
+		result.setToken(token);
+		return result;
+
+	}
+
+	public void extractRecords(ChukwaDSInternalResult res,int directoryType,String rootFolder,String dataSource,String day,String hour,int rawIndex,
+								Token token,List<Record> records,int maxRows,long t0,long t1,String filter) throws Exception
+	{
+		// for each spill file
+		// extract records
+		int spill = res.spill;
+		
+		boolean workdone = false;
+		do
+		{
+			String fileName = buildFileName(directoryType,rootFolder,dataSource,spill,day,hour,rawIndex);
+			log.debug("extractRecords : " + fileName);
+			
+			if (fs.exists(new Path(fileName)))
+			{
+				 readData(res,token,fileName,maxRows,t0,t1,filter);
+				 res.spill = spill;
+				 List<Record> localRecords = res.records;
+				log.debug("localRecords size : " + localRecords.size());
+				maxRows = maxRows - localRecords.size();
+				if (maxRows <= 0)
+				{
+					workdone = true;
+				}
+				records.addAll(localRecords);
+				log.debug("AFTER fileName  [" +fileName + "] count=" + localRecords.size() + " maxCount=" + maxRows);
+				spill ++;
+			}
+			else
+			{
+				// no more spill
+				workdone = true;
+			}
+		}
+		while(!workdone);
+		token.key = day + "|" + hour + "|" + rawIndex + "|" + spill + "|" + res.currentTs + "|"+ res.position + "|" + res.fileName;
+	}
+	
+	
+	public  void readData(ChukwaDSInternalResult res,Token token,String fileName,int maxRows,long t0, long t1,String filter) throws
+			Exception
+	{
+		List<Record> records = new LinkedList<Record>();
+		res.records = records;	
+		SequenceFile.Reader r= null;
+		if (filter != null)
+			{ filter = filter.toLowerCase();}
+		
+		try
+		{
+			
+			if (!fs.exists(new Path(fileName)))
+			{
+				log.debug("fileName not there!");
+				return;
+			}
+			log.debug("Parser Open [" +fileName + "]");
+			
+			long timestamp = 0;
+			int listSize = 0;
+			ChukwaRecordKey key = new ChukwaRecordKey();
+		    ChukwaRecord record = new ChukwaRecord();
+		    
+			r= new SequenceFile.Reader(fs, new Path(fileName), conf);
+			
+			log.debug("readData Open2 [" +fileName + "]");
+			if ( (fileName.equals(res.fileName)) && (res.position != -1))
+			{
+				r.seek(res.position);
+			}
+			res.fileName = 	fileName;
+			
+			while(r.next(key, record))
+			{	
+				if (record != null)
+				{
+					res.position = r.getPosition();
+					
+					timestamp = record.getTime();
+					res.currentTs = timestamp;
+					log.debug("\nSearch for startDate: " + new Date(t0) + " is :" + new Date(timestamp));
+					
+					if (timestamp < t0) 
+					{
+						 //log.debug("Line not in range. Skipping: " +record);
+						 continue;
+					} 
+					else if (timestamp < t1) 
+					{
+						log.debug("In Range: " + record.toString());
+						boolean valid = false;
+						
+						 if ( (filter == null || filter.equals("") ))
+						 {
+							 valid = true;
+						 }
+						 else if ( isValid(record,filter))
+						 {
+							 valid = true;
+						 }
+						 
+						 if (valid)
+						 {
+							records.add(record);
+							record = new ChukwaRecord();
+							listSize = records.size();
+							if (listSize >= maxRows)
+							{
+								// maxRow so stop here
+								//Update token
+								token.key = key.getKey();
+								token.hasMore = true;
+								break;
+							}
+						 }
+						else 
+						{
+							log.debug("In Range ==================>>>>>>>>> OUT Regex: " + record);
+						}
+					}
+					else
+					{
+						 log.debug("Line out of range. Stopping now: " +record);
+						 // Update Token
+						 token.key = key.getKey();
+						 token.hasMore = false;
+						 break;
+					}
+				}
+			}
+			
+		}
+		catch(Exception e)
+		{
+			e.printStackTrace();
+		}
+		finally
+		{
+			try
+			{
+				r.close();
+			}
+			catch(Exception e){}
+		}
+	}
+	
+	public boolean containsRotateFlag(int directoryType,String rootFolder,String dataSource,String workingDay,String workingHour) throws Exception
+	{
+		boolean contains = false;
+		switch(directoryType)
+		{
+			case	ChukwaRecordDataSource.dayFolder:
+				//	SystemMetrics/20080922/rotateDone	
+				contains = fs.exists(new Path( rootFolder + dataSource + "/" + workingDay+"/rotateDone"));
+			break;
+			
+			case	ChukwaRecordDataSource.hourFolder:
+				// SystemMetrics/20080922/12/rotateDone
+				contains =  fs.exists(new Path( rootFolder + dataSource + "/" + workingDay+ "/" + workingHour +"/rotateDone"));
+			break;
+			
+		}
+		return contains;
+	}
+	
+	public boolean exist(int directoryType,String rootFolder,String dataSource,String workingDay,String workingHour,String raw) throws Exception
+	{
+		boolean contains = false;
+		switch(directoryType)
+		{
+			case	ChukwaRecordDataSource.dayFolder:
+				//	SystemMetrics/20080922/rotateDone	
+				contains = fs.exists(new Path( rootFolder + dataSource + "/" + workingDay));
+			break;
+			
+			case	ChukwaRecordDataSource.hourFolder:
+				// SystemMetrics/20080922/12/rotateDone
+				contains =  fs.exists(new Path( rootFolder + dataSource + "/" + workingDay+ "/" + workingHour ));
+			break;
+			case	ChukwaRecordDataSource.rawFolder:
+				// SystemMetrics/20080922/12/rotateDone
+				contains =  fs.exists(new Path( rootFolder + dataSource + "/" + workingDay+ "/" + workingHour + "/" + raw));
+			break;
+			
+		}
+		return contains;
+	}
+	
+	
+	protected boolean isValid(ChukwaRecord record, String filter)
+	{
+		String[] fields = record.getFields();
+		for(String field: fields)
+		{
+			if ( record.getValue(field).toLowerCase().indexOf(filter) >= 0)
+			{
+				return true;
+			}
+		}
+		return false;
+	}
+	
+	public String buildFileName(int directoryType,String rootFolder,String dataSource,int spill,String day,String hour,int rawIndex)
+	{
+		String fileName = null;
+		// TODO use StringBuilder
+		// TODO revisit the way we're building fileName
+		
+		switch(directoryType)
+		{
+			case	ChukwaRecordDataSource.dayFolder:
+				//	SystemMetrics/20080922/SystemMetrics_20080922.1.evt	
+				fileName = rootFolder + "/" + dataSource + "/" + day + "/" 
+					+ dataSource + "_" +  day + "." + spill + ".evt";
+			break;
+			
+			case	ChukwaRecordDataSource.hourFolder:
+				// SystemMetrics/20080922/12/SystemMetrics_20080922_12.1.evt
+				fileName = rootFolder + "/" + dataSource + "/" + day + "/" + hour + "/" 
+					+ dataSource + "_" +  day + "_" + hour + "." + spill + ".evt";
+			break;
+			
+			case	ChukwaRecordDataSource.rawFolder:
+				// SystemMetrics/20080922/0/25/SystemMetrics_20080922_0_25.1.evt	
+				fileName = rootFolder + "/" + dataSource + "/" + day + "/" + hour + "/"  + raws[rawIndex] + "/"
+					+ dataSource + "_" +  day + "_" + hour + "_" + raws[rawIndex] + "." + spill + ".evt";
+			break;
+		}
+		log.debug("buildFileName :" + fileName);
+		return fileName;
+	}
+	
+	public static void main(String[] args) throws DataSourceException
+	{
+		ChukwaRecordDataSource ds = new ChukwaRecordDataSource();
+		SearchResult result = new ChukwaSearchResult();
+		result.setRecords( new TreeMap<Long,List<Record>>());
+		String cluster = args[0];
+		String dataSource = args[1];
+		long t0 = Long.parseLong(args[2]);
+		long t1 = Long.parseLong(args[3]);
+		String filter = null;
+		Token token = null;
+		
+		if (args.length >= 5 && !args[4].equalsIgnoreCase("null"))
+		{
+			filter = args[4];
+		}
+		if (args.length == 6)
+		{
+			token = new Token();
+			token.key = args[5];
+			System.out.println("token :" + token.key);
+		}
+		
+		System.out.println("cluster :" + cluster);
+		System.out.println("dataSource :" + dataSource);
+		System.out.println("t0 :" + t0);
+		System.out.println("t1 :" + t1);
+		System.out.println("filter :" +filter );
+		
+		
+		ds.search(result, cluster, dataSource, t0, t1, filter,token);
+		TreeMap<Long, List<Record>> records = result.getRecords();
+		Iterator<Long> it = records.keySet().iterator();
+		
+		while(it.hasNext())
+		{
+			long ts = it.next();
+			System.out.println("\n\nTimestamp: " + new Date(ts));
+			List<Record> list = records.get(ts);
+			for (int i=0;i<list.size();i++)
+			{
+				System.out.println(list.get(i));
+			}
+		}
+		
+		if (result.getToken() != null)
+		 { System.out.println("Key -->" + result.getToken().key);}
+	}
+}

Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/datasource/record/ChukwaSequenceFileParser.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/datasource/record/ChukwaSequenceFileParser.java?rev=723855&r1=723854&r2=723855&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/datasource/record/ChukwaSequenceFileParser.java (original)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/datasource/record/ChukwaSequenceFileParser.java Fri Dec  5 12:30:14 2008
@@ -19,18 +19,17 @@
 package org.apache.hadoop.chukwa.extraction.engine.datasource.record;
 
 import java.io.IOException;
-import java.util.Calendar;
+import java.util.Date;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.Date;
 
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
 import org.apache.hadoop.chukwa.extraction.engine.Record;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Text;
 
 public class ChukwaSequenceFileParser
 {
@@ -67,60 +66,59 @@
 			long offset = 0;
 			
 //			HdfsWriter.HdfsWriterKey key = new HdfsWriter.HdfsWriterKey();
-			Text key = new Text();
-
-		    ChukwaRecord evt = new ChukwaRecord();
-			while(r.next(key, evt))
+			ChukwaRecordKey key = new ChukwaRecordKey();
+		    ChukwaRecord record = new ChukwaRecord();
+		    
+			while(r.next(key, record))
 			{	
 				lineCount ++;
 				
-				System.out.println("NameNodeParser Line [" +evt.getValue(Record.bodyField) + "]");	
+				System.out.println("NameNodeParser Line [" +record.getValue(Record.bodyField) + "]");	
 				
-				if (evt != null)
+				if (record != null)
 				{
-					timestamp = evt.getTime();
+					timestamp = record.getTime();
 					if (timestamp < t0) 
 					{
-						 System.out.println("Line not in range. Skipping: " +evt.getValue(Record.bodyField));
+						 System.out.println("Line not in range. Skipping: " +record.getValue(Record.bodyField));
 						 System.out.println("Search for: " + new Date(t0) + " is :" + new Date(timestamp));
 						 continue;
 					} 
 					else if ((timestamp < t1) && (offset < maxOffset )) //JB (epochTS < maxDate)
 					{
 						
-						System.out.println("In Range: " + evt.getValue(Record.bodyField));
+						System.out.println("In Range: " + record.getValue(Record.bodyField));
 						boolean valid = false;
 						
 						 if ( (filter == null || filter.equals("") ))
 						 {
 							 valid = true;
 						 }
-						 else if (evt.getValue(Record.rawField).toLowerCase().indexOf(filter) > 0)
-						   {
- System.out.println("MATCH " +  filter + "===========================>>>>>>>" + evt.getValue(Record.rawField));
-							   valid = true;
-						   }
+						 else if ( isValid(record,filter))
+						 {
+							 valid = true;
+						 }
 						 
 						 if (valid)
 						 {
-							records.add(evt);
-evt = new ChukwaRecord();
+							records.add(record);
+							record = new ChukwaRecord();
 							listSize = records.size();
 							if (listSize > maxRows)
 							{
 								records.remove(0);
-								System.out.println("==========>>>>>REMOVING: " + evt.getValue(Record.bodyField));
+								System.out.println("==========>>>>>REMOVING: " + record.getValue(Record.bodyField));
 							}
 						 }
 						else 
 						{
-							System.out.println("In Range ==================>>>>>>>>> OUT Regex: " + evt.getValue(Record.bodyField));
+							System.out.println("In Range ==================>>>>>>>>> OUT Regex: " + record.getValue(Record.bodyField));
 						}
 
 					}
 					else
 					{
-						 System.out.println("Line out of range. Stopping now: " +evt.getValue(Record.bodyField));
+						 System.out.println("Line out of range. Stopping now: " +record.getValue(Record.bodyField));
 						break;
 					}
 				}
@@ -146,26 +144,17 @@
 		return records;
 	}
 	
-	public static void main(String[] args) throws Throwable
+	
+	protected static boolean isValid(ChukwaRecord record, String filter)
 	{
-		Configuration conf = new Configuration();
-
-	    FileSystem fs = FileSystem.get(conf);//FileSystem.get(new URI(fsURL), conf);
-	    Calendar c = Calendar.getInstance();
-	    c.add(Calendar.MONTH, -2);
-	    
-	    ChukwaSequenceFileParser.readData(	"/tmp/t1", "NameNode",
-	    									200, new java.util.Date().getTime(), 
-	    									c.getTimeInMillis(), Long.MAX_VALUE, null, 
-	    									args[0], fs, conf);
-	    
-	    SequenceFile.Reader r= new SequenceFile.Reader(fs, new Path(args[0]), conf);
-	    Text key = new Text();
-	    
-	    ChukwaRecord evt = new ChukwaRecord();
-	    while(r.next(key, evt))
-	    {
-	      System.out.println( evt);
-	    }
+		String[] fields = record.getFields();
+		for(String field: fields)
+		{
+			if ( record.getValue(field).toLowerCase().indexOf(filter) >= 0)
+			{
+				return true;
+			}
+		}
+		return false;
 	}
 }

Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/datasource/record/RecordDS.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/datasource/record/RecordDS.java?rev=723855&r1=723854&r2=723855&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/datasource/record/RecordDS.java (original)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/datasource/record/RecordDS.java Fri Dec  5 12:30:14 2008
@@ -19,20 +19,19 @@
 package org.apache.hadoop.chukwa.extraction.engine.datasource.record;
 
 import java.io.IOException;
-import java.text.ParseException;
+import java.text.SimpleDateFormat;
 import java.util.Calendar;
-import java.util.Date;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.TreeMap;
 
-import org.apache.hadoop.chukwa.extraction.engine.ChukwaSearchResult;
+import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
 import org.apache.hadoop.chukwa.extraction.engine.Record;
 import org.apache.hadoop.chukwa.extraction.engine.SearchResult;
+import org.apache.hadoop.chukwa.extraction.engine.Token;
 import org.apache.hadoop.chukwa.extraction.engine.datasource.DataSource;
 import org.apache.hadoop.chukwa.extraction.engine.datasource.DataSourceException;
 import org.apache.hadoop.chukwa.inputtools.mdl.DataConfig;
-import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
 import org.apache.hadoop.fs.FileSystem;
 
 public class RecordDS implements DataSource
@@ -64,7 +63,7 @@
 									String dataSource, 
 									long t0, 
 									long t1, 
-									String filter)
+									String filter,Token token)
 			throws DataSourceException
 	{
 		
@@ -77,20 +76,20 @@
 		
 		TreeMap<Long, List<Record>> records = result.getRecords();
 		int maxCount = 200;
-		
+		SimpleDateFormat sdf = new java.text.SimpleDateFormat("_yyyyMMdd_HH_");
 		do
 		{
 			System.out.println("start Date [" + calendar.getTime() + "]");
-			String fileName = new java.text.SimpleDateFormat("_yyyy_MM_dd_HH").format(calendar.getTime());
+			String fileName = sdf.format(calendar.getTime());
 			int minutes = calendar.get(Calendar.MINUTE);
 			int dec = minutes/10;
-			fileName += "_" + dec ;
+			fileName +=  dec ;
 			
 			int m = minutes - (dec*10);
 			if (m < 5)
-			{ fileName += "0.evt";}
+			{ fileName += "0.1.evt";}
 			else
-			{ fileName += "5.evt";}
+			{ fileName += "5.1.evt";}
 			
 			fileName = filePath + "/" + dataSource + fileName;
 			
@@ -145,39 +144,6 @@
 		
 		return result;
 	}
-
-	
-	public static void main(String[] args) throws DataSourceException
-	{
-		long t1 = 0;
-		long t0 = 0;
-		System.out.println("Hello");
-		Calendar calendar = Calendar.getInstance();
-		Date d1;
-		try
-		{
-			d1 = new java.text.SimpleDateFormat ("dd/MM/yyyy HH:mm:ss").parse("05/06/2008 19:31:05");
-			calendar.setTime(d1);
-			t1 = calendar.getTimeInMillis();
-			d1 = new java.text.SimpleDateFormat ("dd/MM/yyyy HH:mm:ss").parse("05/06/2008 19:26:05");
-			calendar.setTime(d1);
-			t0 = calendar.getTimeInMillis();
-			
-		} catch (ParseException e)
-		{
-			e.printStackTrace();
-			throw new RuntimeException(e);
-		}
-		
-		String filter = null;
-		RecordDS dao = new RecordDS();
-		SearchResult result = new ChukwaSearchResult();
-		
-		TreeMap<Long, List<Record>> records = new TreeMap<Long,List<Record>> ();
-		result.setRecords(records);
-		
-		dao.search(result,"output2","NameNode",t0,t1,filter);
-	}
 	
 	public boolean isThreadSafe()
 	{

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/hicc/Chart.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/hicc/Chart.java?rev=723855&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/hicc/Chart.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/hicc/Chart.java Fri Dec  5 12:30:14 2008
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.hicc;
+
+import java.io.PrintWriter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.TreeMap;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.Date;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import javax.servlet.http.HttpServletRequest;
+import org.apache.hadoop.chukwa.hicc.ColorPicker;
+
+
+public class Chart {
+    private String id;
+    private String title;
+    private String graphType;
+    private ArrayList<TreeMap<String, TreeMap<String, Double>>> dataset;
+    private ArrayList<String> chartType;
+	private boolean xLabelOn;
+	private boolean yLabelOn;
+	private boolean yRightLabelOn;
+	private int width;
+	private int height;
+	private List<String> xLabelRange;
+	private HttpServletRequest request = null;
+    private boolean legend;
+    private String xLabel="";
+    private String yLabel="";
+    private String yRightLabel="";
+    private int datasetCounter=0;
+    private double max=0;
+    private int seriesCounter=0;
+    private List<String> rightList;
+    private boolean userDefinedMax = false;
+    private String[] seriesOrder=null;
+    
+	public Chart(HttpServletRequest request) {
+		if(request.getParameter("boxId")!=null) {
+			this.id=request.getParameter("boxId");
+		} else {
+			this.id="0";
+		}
+        this.title="Untitled Chart";
+        this.graphType="image";
+        this.xLabelOn=true;
+        this.yLabelOn=true;
+        this.width=400;
+        this.height=200;
+        this.request=request;
+        this.legend=true;
+        this.max=0;
+        this.datasetCounter=0;
+        this.seriesCounter=0;
+        this.rightList = new ArrayList<String>();
+        this.userDefinedMax=false;
+        this.seriesOrder=null;
+    }
+	
+	public void setYMax(double max) {
+		this.max=max;
+		this.userDefinedMax=true;
+	}
+	
+	public void setSize(int width, int height) {
+		this.width=width;
+		this.height=height;
+	}
+    public void setGraphType(String graphType) {
+    	if(graphType!=null) {
+            this.graphType = graphType;
+    	}
+    }
+    
+    public void setTitle(String title) {
+    	this.title=title;    	
+    }
+    
+    public void setId(String id) {
+    	this.id=id;
+    }
+    
+    public void setDataSet(String chartType, TreeMap<String, TreeMap<String, Double>> data) {
+    	if(this.dataset==null) {
+    		this.dataset = new ArrayList<TreeMap<String, TreeMap<String, Double>>>();
+    		this.chartType = new ArrayList<String>();
+    	}
+   		this.dataset.add(data);
+   		this.chartType.add(chartType);
+    }
+
+	public void setSeriesOrder(String[] metrics) {
+		this.seriesOrder = metrics;
+	}
+
+    public void setXAxisLabels(boolean toggle) {
+    	xLabelOn = toggle;
+    }
+
+    public void setYAxisLabels(boolean toggle) {
+    	yLabelOn = toggle;    	
+    }
+
+    public void setYAxisRightLabels(boolean toggle) {
+    	yRightLabelOn = toggle;    	
+    }
+
+    public void setXAxisLabel(String label) {
+    	xLabel = label;
+    }
+
+    public void setYAxisLabel(String label) {
+    	yLabel = label;
+    }
+
+    public void setYAxisRightLabel(String label) {
+    	yRightLabel = label;
+    }
+
+    public void setXLabelsRange(List<String> range) {
+    	xLabelRange = range;
+    }
+
+    public void setLegend(boolean toggle) {
+    	legend = toggle;
+    }
+    public String plot() {
+    	String output="";
+    	if(dataset==null) {
+    		output = "No Data available.";
+    		return output;
+    	}
+		String dateFormat="%H:%M";
+		if(xLabel.equals("Time")) {
+			SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+	        long xMin;
+			try {
+				xMin = Long.parseLong(xLabelRange.get(0));
+    	        long xMax = Long.parseLong(xLabelRange.get(xLabelRange.size()-1));
+    	        if(xMax-xMin>31536000000L) {
+    	        	dateFormat="%y";
+    	        } else if(xMax-xMin>2592000000L) {
+    	        	dateFormat="%y-%m";
+    	        } else if(xMax-xMin>604800000L) {
+    	        	dateFormat="%m-%d";
+    	        } else if(xMax-xMin>86400000L) {
+    	        	dateFormat="%m-%d %H:%M";
+    	        }
+			} catch (NumberFormatException e) {
+				dateFormat="%y-%m-%d %H:%M";
+			}
+		}
+        output = "<html><link href=\"/hicc/css/default.css\" rel=\"stylesheet\" type=\"text/css\">\n" +
+		         "<body onresize=\"wholePeriod()\"><script type=\"text/javascript\" src=\"/hicc/js/jquery-1.2.6.min.js\"></script>\n"+
+		         "<script type=\"text/javascript\" src=\"/hicc/js/jquery.flot.pack.js\"></script>\n"+
+		         "<script type=\"text/javascript\" src=\"/hicc/js/excanvas.pack.js\"></script>\n"+
+		         "<center>"+title+"</center>\n"+
+		         "<div id=\"placeholder\" style=\"width:"+this.width+"px;height:"+this.height+"px;\"></div>\n"+
+		         "<div id=\"placeholderLegend\"></div>\n"+
+		         "<input type=\"hidden\" id=\"boxId\" value=\"iframe"+this.id+"\">\n"+
+		         "<script type=\"text/javascript\" src=\"/hicc/js/flot.extend.js\">\n" +
+		         "</script>\n" +
+		         "<script type=\"text/javascript\">\n"+
+		         "var cw = document.body.clientWidth-70;\n"+
+		         "var ch = document.body.clientHeight-50;\n"+
+		         "document.getElementById('placeholder').style.width=cw+'px';\n"+
+		         "document.getElementById('placeholder').style.height=ch+'px';\n"+
+		         "_options={\n"+
+		         "        points: { show: false },\n"+
+		         "        xaxis: {                timeformat: \""+dateFormat+"\",\n"+
+		         "	                mode: \"time\"\n"+
+		         "	        },\n"+
+		         "	        selection: { mode: \"x\" },\n"+
+		         "	        grid: {\n"+
+		         "	                clickable: true,\n"+
+		         "	                hoverable: true,\n"+
+		         "	                tickColor: \"#C0C0C0\",\n"+
+		         "	                backgroundColor:\"#FFFFFF\"\n"+
+		         "	        },\n"+
+		         "	        legend: { show: "+this.legend+" },\n"+
+                 "          yaxis: { ";
+        boolean stack = false;
+        for(String type : this.chartType) {
+            if(type.startsWith("stack")) {
+                stack=true;
+            }
+        }
+        if(stack) {
+            output = output + "mode: \"stack\", ";
+        }
+        if(userDefinedMax) {
+        	output = output + "tickFormatter: function(val, axis) { " +
+        	    "return val.toFixed(axis.tickDecimals) + \" %\"; }";
+        } else {
+            output = output + "tickFormatter: function(val, axis) { " +
+		        "if (val > 1000000000000000) return (val / 1000000000000000).toFixed(axis.tickDecimals) + \"PB\";" +
+                "else if (val > 1000000000000) return (val / 1000000000000).toFixed(axis.tickDecimals) + \"TB\";" +
+				"else if (val > 1000000000) return (val / 1000000000).toFixed(axis.tickDecimals) + \"GB\";" +
+        		"else if (val > 1000000) return (val / 1000000).toFixed(axis.tickDecimals) + \"MB\";" +
+        		"else if (val > 1000) return (val / 1000).toFixed(axis.tickDecimals) + \"KB\";" +
+        		"else return val.toFixed(axis.tickDecimals) + \"B\"; }";
+        }
+        if(userDefinedMax) {
+            output = output + ", min:0, max:"+this.max;
+        }
+        output = output + "}\n";
+        output = output +
+		         "	};\n"+
+		         "_series=[\n";
+		            ArrayList<Long> numericLabelRange = new ArrayList<Long>();
+				    if(xLabel.equals("Time")) {
+				        SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+				        for(int i=0;i<xLabelRange.size();i++) {
+				        	try {
+				                Date d = formatter.parse(xLabelRange.get(i));
+				                numericLabelRange.add(d.getTime());
+				        	} catch(Exception e) {
+				        	}
+				        }                
+				    } else {
+					    for(int i=0;i<xLabelRange.size();i++) {
+					    	numericLabelRange.add(Long.parseLong(xLabelRange.get(i)));
+					    }
+				    }
+			        ColorPicker cp = new ColorPicker();
+		   			int i=0;
+				    for(TreeMap<String, TreeMap<String, Double>> dataMap : this.dataset) {
+		   	            String[] keyNames = null;
+		   	            if(this.seriesOrder!=null) {
+		   	            	keyNames = this.seriesOrder;
+		   	            } else {
+    		   	            keyNames = ((String[]) dataMap.keySet().toArray(new String[dataMap.size()]));
+		   	            }
+			   			int counter=0;
+			   			if(i!=0) {
+			   				if(!this.userDefinedMax) {
+			   				    this.max=0;
+			   				}
+			   			}
+			   			for(String ki : keyNames) {
+			   				TreeMap<String, Double> data = dataMap.get(ki);
+			   				if(data!=null) {
+			   				    for(String dp: data.keySet()) {
+			   				    	try {
+			   					        if(data.get(dp)>this.max) {
+			   						        if(!this.userDefinedMax) {
+			   						            this.max=data.get(dp);
+			   						        }
+			   					        }
+			   				    	} catch (NullPointerException e) {
+			   				    		// skip if this data point does not exist.
+			   				    	}
+			   				    }
+			   				}
+			   			}    			   			
+		   	            for(String seriesName : keyNames) {
+		   	   			    int counter2=0;
+					 	    if(counter!=0) {
+		   					    output+=",";
+		   				    }
+					 	    String param="fill: false";
+					 	    String type="lines";
+					 	    if(this.chartType.get(i).equals("stack-area") || this.chartType.get(i).equals("area")) {
+					 	    	param="fill: true";
+					 	    }
+					 	    if(this.chartType.get(i).equals("bar")) {
+					 	    	type="bars";
+					 	    	param="stepByStep: true";
+					 	    }
+					 	    output+="  {"+type+": { show: true, "+param+" }, color: \""+cp.get(counter+1)+"\", label: \""+seriesName+"\", ";
+					 	    String showYAxis="false";
+					 	    String shortRow="false";
+					 	    if(counter==0 || i>0) {
+					 	    	showYAxis="true";
+					 	    	shortRow="false";
+					 	    }
+					 	    output+=" row: { show: "+showYAxis+",shortRow:"+shortRow+", showYAxis:"+showYAxis+"}, data:[";
+		   	        	    TreeMap<String, Double> data = dataMap.get(seriesName);
+		   	        	    for(String dp : data.keySet()) {
+		   			 	        if(counter2!=0) {
+		   					        output+=",";
+		   				        }
+		   				        output+="["+dp+","+data.get(dp)+"]";
+		   				        counter2++;
+		   			        }
+		   	   			    output+="], min:0, max:"+this.max+"}";
+		   	   			    counter++;
+		   	            }
+		   	            i++;
+		    	    }          
+		output+=" ];\n"+
+		         " wholePeriod();</script></body></html>\n";
+    	return output;
+    }            
+}

Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/hicc/ClusterConfig.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/hicc/ClusterConfig.java?rev=723855&r1=723854&r2=723855&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/hicc/ClusterConfig.java (original)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/hicc/ClusterConfig.java Fri Dec  5 12:30:14 2008
@@ -23,7 +23,7 @@
 
 public class ClusterConfig {
     public static HashMap<String, String> clusterMap = new HashMap<String, String>();
-    private String path=System.getenv("CHUKWA_HOME")+File.separator+"conf"+File.separator;
+    private String path=System.getenv("CHUKWA_CONF_DIR")+File.separator;
     static public String getContents(File aFile) {
         //...checks on aFile are elided
         StringBuffer contents = new StringBuffer();

Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/hicc/DatasetMapper.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/hicc/DatasetMapper.java?rev=723855&r1=723854&r2=723855&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/hicc/DatasetMapper.java (original)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/hicc/DatasetMapper.java Fri Dec  5 12:30:14 2008
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.chukwa.hicc;
 
+import java.text.SimpleDateFormat;
+import java.util.TreeMap;
 import java.util.HashMap;
 import java.util.ArrayList;
 import java.util.List;
@@ -29,19 +31,21 @@
 public class DatasetMapper {
     private String jdbc;
     private static Log log = LogFactory.getLog(DatasetMapper.class);
-    private HashMap<String, ArrayList<Double>> dataset;
+    private TreeMap<String, TreeMap<String, Double>> dataset;
     private List<String> labels;
 	public DatasetMapper(String jdbc) {
 	    this.jdbc=jdbc;
-	    this.dataset = new HashMap<String, ArrayList<Double>>();
+	    this.dataset = new TreeMap<String, TreeMap<String, Double>>();
 	    this.labels = new ArrayList<String>();
 	}
-	public void execute(String query, boolean groupBySecondColumn) {
+	public void execute(String query, boolean groupBySecondColumn, boolean calculateSlope, String formatTime) {
+		SimpleDateFormat sdf = null;
 		dataset.clear();
 	    try {
 	        // The newInstance() call is a work around for some
 	        // broken Java implementations
-	        Class.forName("com.mysql.jdbc.Driver").newInstance();
+                String jdbcDriver = System.getenv("JDBC_DRIVER");
+	        Class.forName(jdbcDriver).newInstance();
 	    } catch (Exception ex) {
 	        // handle the error
 	    }
@@ -61,11 +65,18 @@
 	            rs = stmt.getResultSet();
 	            ResultSetMetaData rmeta = rs.getMetaData();
 	            int col=rmeta.getColumnCount();
+	            double[] previousArray = new double[col+1];
+	            for(int k=0;k<col;k++) {
+	            	previousArray[k]=0.0;
+	            }
 	            int i=0;
-	            java.util.ArrayList<Double> data = null;
+	            java.util.TreeMap<String, Double> data = null;
+	            HashMap<String, Double> previousHash = new HashMap<String, Double>();
 	            HashMap<String, Integer> xAxisMap = new HashMap<String, Integer>();
 	            while (rs.next()) {
-	                String label = rs.getString(1);
+	                String label = "";
+	                long  time = rs.getTimestamp(1).getTime();
+	                label = ""+time;
 	                if(!xAxisMap.containsKey(label)) {
 	                    xAxisMap.put(label, i);
 	                    labels.add(label);
@@ -74,15 +85,33 @@
 	                if(groupBySecondColumn) {
 	                    String item = rs.getString(2);
 	                    // Get the data from the row using the series column
-	                    double current = rs.getDouble(3);
-	                    if(current>max) {
-	                        max=current;
-	                    }
 	                    data = dataset.get(item);
 	                    if(data == null) {
-	                        data = new java.util.ArrayList<Double>();
+	                        data = new java.util.TreeMap<String, Double>();
+	                    }
+	                    if(calculateSlope) {
+	                    	double current = rs.getDouble(3);
+	                    	double tmp = 0L;
+	                    	if(data.size()>1) {
+                            	tmp = current - previousHash.get(item).doubleValue();
+                            } else {
+                            	tmp = 0;
+                            }
+                            if(tmp<0) {
+                                tmp=0;
+                            }
+                            if(tmp>max) {
+                            	max=tmp;
+                            }
+                            previousHash.put(item,current);
+                            data.put(label, tmp);
+	                    } else {
+	                    	double current = rs.getDouble(3);
+		                    if(current>max) {
+		                        max=current;
+		                    }
+		                    data.put(label, current);	                    	
 	                    }
-	                    data.add(rs.getDouble(3));
 	                    dataset.put(item,data);
 	                } else {
 	                    for(int j=2;j<=col;j++) {
@@ -94,16 +123,30 @@
 	                        }
 	                        data = dataset.get(item);
 	                        if(data == null) {
-	                            data = new java.util.ArrayList<Double>();
+	                            data = new java.util.TreeMap<String, Double>();
+	                        }
+	                        if(calculateSlope) {
+	                        	double tmp = rs.getDouble(j);
+                                if(data.size()>1) {
+	                        	    tmp = tmp - previousArray[j];
+                                } else {
+                                    tmp = 0.0;
+                                }
+                                previousArray[j]=current;
+                                if(tmp<0) {
+                                  	tmp=0;
+                                }
+	                        	data.put(label, tmp);
+	                        } else {
+		                        data.put(label, current);	                        	
 	                        }
-	                        data.add(rs.getDouble(j));
 	                        dataset.put(item,data);
 	                    }
 	                }
 	            }
 	            labelsCount=i;
 	        } else {
-	                log.error("query is not executed.");
+	            log.error("query is not executed.");
 	        }
 	        // Now do something with the ResultSet ....
 	    } catch (SQLException ex) {
@@ -146,7 +189,7 @@
 	public List<String> getXAxisMap() {
 		return labels;
 	}
-	public HashMap<String, ArrayList<Double>> getDataset() {
+	public TreeMap<String, TreeMap<String, Double>> getDataset() {
 		return dataset;
 	}
 }