You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@chukwa.apache.org by as...@apache.org on 2009/03/11 23:39:32 UTC

svn commit: r752666 [5/16] - in /hadoop/chukwa/trunk: ./ src/java/org/apache/hadoop/chukwa/ src/java/org/apache/hadoop/chukwa/conf/ src/java/org/apache/hadoop/chukwa/database/ src/java/org/apache/hadoop/chukwa/datacollection/ src/java/org/apache/hadoop...

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/ChukwaWriter.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/ChukwaWriter.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/ChukwaWriter.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/ChukwaWriter.java Wed Mar 11 22:39:26 2009
@@ -18,15 +18,16 @@
 
 package org.apache.hadoop.chukwa.datacollection.writer;
 
-import java.util.List;
 
+import java.util.List;
 import org.apache.hadoop.chukwa.Chunk;
 import org.apache.hadoop.conf.Configuration;
 
-public interface ChukwaWriter
-{
-	public void init(Configuration c) throws WriterException;
-	public void add(List<Chunk> chunks) throws WriterException;
-	public void close() throws WriterException;;
+public interface ChukwaWriter {
+  public void init(Configuration c) throws WriterException;
+
+  public void add(List<Chunk> chunks) throws WriterException;
+
+  public void close() throws WriterException;;
 
 }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/ClientAck.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/ClientAck.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/ClientAck.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/ClientAck.java Wed Mar 11 22:39:26 2009
@@ -18,89 +18,83 @@
 
 package org.apache.hadoop.chukwa.datacollection.writer;
 
+
 import org.apache.log4j.Logger;
 
-public class ClientAck
-{
-	static Logger log = Logger.getLogger(ClientAck.class);
-	
-	// TODO move all constant to config
-	
-	public static final int OK = 100;
-	public static final int KO = -100;
-	public static final int KO_LOCK = -200;
-	
-	private long ts = 0;
-	
-	private Object lock = new Object();
-	private int status = 0;
-	private Throwable exception = null;
-	private int waitTime = 6*1000;// 6 secs
-	private int timeOut = 15*1000;
-	
-	public ClientAck()
-	{
-		this.ts = System.currentTimeMillis() + timeOut;
-	}
-	
-  public int getTimeOut()
-  {
+public class ClientAck {
+  static Logger log = Logger.getLogger(ClientAck.class);
+
+  // TODO move all constant to config
+
+  public static final int OK = 100;
+  public static final int KO = -100;
+  public static final int KO_LOCK = -200;
+
+  private long ts = 0;
+
+  private Object lock = new Object();
+  private int status = 0;
+  private Throwable exception = null;
+  private int waitTime = 6 * 1000;// 6 secs
+  private int timeOut = 15 * 1000;
+
+  public ClientAck() {
+    this.ts = System.currentTimeMillis() + timeOut;
+  }
+
+  public int getTimeOut() {
     return timeOut;
   }
 
-  public void wait4Ack()
-	{
-		synchronized(lock)
-		{
-//			log.info(">>>>>>>>>>>>>>>>>>>>>>>>> Client synch");
-			while (this.status == 0)
-			{
-//				log.info(">>>>>>>>>>>>>>>>>>>>>>>>> Client Before wait");
-				try { lock.wait(waitTime);}
-				catch(InterruptedException e)
-				{}
-				long now = System.currentTimeMillis();
-				if (now > ts)
-				{
-					this.status = KO_LOCK;
-					this.exception = new RuntimeException("More than maximum time lock [" + this.toString() +"]");
-				}
-			}
-//			log.info("[" + Thread.currentThread().getName() + "] >>>>>>>>>>>>>>>>> Client after wait status [" + status +  "] [" + this.toString() + "]");
-		}
-	}
-
-	public void releaseLock(int status, Throwable exception)
-	{
-		this.exception = exception;
-		this.status = status;
-		
-//		log.info("[" + Thread.currentThread().getName() + "] <<<<<<<<<<<<<<<<< Server synch [" + status + "] ----->>>> [" + this.toString() + "]");
-		synchronized(lock)
-		{		
-//			log.info("<<<<<<<<<<<<<<< Server before notify");
-			lock.notifyAll();
-		}
-//		log.info("<<<<<<<<<<<<<<< Server after notify");
-	}
-	
-	public int getStatus()
-	{
-		return status;
-	}
-
-	public void setStatus(int status)
-	{
-		this.status = status;
-	}
-
-	public Throwable getException()
-	{
-		return exception;
-	}
-
-	public void setException(Throwable exception)
-	{
-		this.exception = exception;
-	}
+  public void wait4Ack() {
+    synchronized (lock) {
+      // log.info(">>>>>>>>>>>>>>>>>>>>>>>>> Client synch");
+      while (this.status == 0) {
+        // log.info(">>>>>>>>>>>>>>>>>>>>>>>>> Client Before wait");
+        try {
+          lock.wait(waitTime);
+        } catch (InterruptedException e) {
+        }
+        long now = System.currentTimeMillis();
+        if (now > ts) {
+          this.status = KO_LOCK;
+          this.exception = new RuntimeException("More than maximum time lock ["
+              + this.toString() + "]");
+        }
+      }
+      // log.info("[" + Thread.currentThread().getName() +
+      // "] >>>>>>>>>>>>>>>>> Client after wait status [" + status + "] [" +
+      // this.toString() + "]");
+    }
+  }
+
+  public void releaseLock(int status, Throwable exception) {
+    this.exception = exception;
+    this.status = status;
+
+    // log.info("[" + Thread.currentThread().getName() +
+    // "] <<<<<<<<<<<<<<<<< Server synch [" + status + "] ----->>>> [" +
+    // this.toString() + "]");
+    synchronized (lock) {
+      // log.info("<<<<<<<<<<<<<<< Server before notify");
+      lock.notifyAll();
+    }
+    // log.info("<<<<<<<<<<<<<<< Server after notify");
+  }
+
+  public int getStatus() {
+    return status;
+  }
+
+  public void setStatus(int status) {
+    this.status = status;
+  }
+
+  public Throwable getException() {
+    return exception;
+  }
+
+  public void setException(Throwable exception) {
+    this.exception = exception;
+  }
 }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/ConsoleWriter.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/ConsoleWriter.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/ConsoleWriter.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/ConsoleWriter.java Wed Mar 11 22:39:26 2009
@@ -18,88 +18,84 @@
 
 package org.apache.hadoop.chukwa.datacollection.writer;
 
+
 import java.util.List;
 import java.util.Timer;
 import java.util.TimerTask;
-
 import org.apache.hadoop.chukwa.Chunk;
 import org.apache.hadoop.conf.Configuration;
 
 public class ConsoleWriter implements ChukwaWriter {
 
   boolean printData;
-  volatile long dataSize=0;
+  volatile long dataSize = 0;
   final Timer statTimer;
-  
-  
+
   private class StatReportingTask extends TimerTask {
-    private long lastTs=System.currentTimeMillis();
-    private long lastDataSize=0;
+    private long lastTs = System.currentTimeMillis();
+    private long lastDataSize = 0;
+
     public void run() {
-      long time =  System.currentTimeMillis();
-      long interval= time - lastTs;
+      long time = System.currentTimeMillis();
+      long interval = time - lastTs;
       lastTs = time;
-      
+
       long ds = dataSize;
-      long dataRate =  1000 * (ds - lastDataSize) / interval;  //bytes/sec
-        //refers only to data field, not including http or chukwa headers
+      long dataRate = 1000 * (ds - lastDataSize) / interval; // bytes/sec
+      // refers only to data field, not including http or chukwa headers
       lastDataSize = ds;
-      
-      System.out.println("stat=datacollection.writer.ConsoleWriter|dataRate=" + dataRate );
+
+      System.out.println("stat=datacollection.writer.ConsoleWriter|dataRate="
+          + dataRate);
     }
   };
-  
 
   public ConsoleWriter() {
     this(true);
   }
-  
+
   public ConsoleWriter(boolean printData) {
     this.printData = printData;
     statTimer = new Timer();
   }
-  
-  public void close()
-  {
+
+  public void close() {
     statTimer.cancel();
   }
 
-  public void init(Configuration conf) throws WriterException
-  {
-     System.out.println("----  DUMMY HDFS WRITER IN USE ---");
+  public void init(Configuration conf) throws WriterException {
+    System.out.println("----  DUMMY HDFS WRITER IN USE ---");
 
-     statTimer.schedule(new StatReportingTask(), 1000,10*1000);
+    statTimer.schedule(new StatReportingTask(), 1000, 10 * 1000);
   }
 
-  public void add(Chunk data) throws WriterException
-  {
+  public void add(Chunk data) throws WriterException {
     int startOffset = 0;
 
     dataSize += data.getData().length;
-    if(printData) {
+    if (printData) {
       System.out.println(data.getData().length + " bytes of data in chunk");
 
-      for(int offset: data.getRecordOffsets()) {
+      for (int offset : data.getRecordOffsets()) {
         System.out.print(data.getStreamName());
         System.out.print(" ");
         System.out.print(data.getSource());
         System.out.print(" ");
         System.out.print(data.getDataType());
         System.out.print(") ");
-        System.out.print(new String(data.getData(), startOffset, offset - startOffset + 1));
-        startOffset= offset + 1;
+        System.out.print(new String(data.getData(), startOffset, offset
+            - startOffset + 1));
+        startOffset = offset + 1;
       }
     }
   }
 
-@Override
-public void add(List<Chunk> chunks) throws WriterException
-{
-	for(Chunk chunk: chunks)
-	{
-		add(chunk);
-	}
-	
-}
+  @Override
+  public void add(List<Chunk> chunks) throws WriterException {
+    for (Chunk chunk : chunks) {
+      add(chunk);
+    }
+
+  }
 
 }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/Dedup.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/Dedup.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/Dedup.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/Dedup.java Wed Mar 11 22:39:26 2009
@@ -17,21 +17,21 @@
  */
 package org.apache.hadoop.chukwa.datacollection.writer;
 
+
 import java.util.ArrayDeque;
 import java.util.HashSet;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Queue;
-
 import org.apache.hadoop.chukwa.Chunk;
 import org.apache.hadoop.conf.Configuration;
 
 public class Dedup implements PipelineableWriter {
-  
+
   static final class DedupKey {
     String name;
-    long val; //sequence number
-    
+    long val; // sequence number
+
     public DedupKey(String n, long p) {
       name = n;
       val = p;
@@ -42,43 +42,45 @@
     }
 
     public boolean equals(Object dk) {
-      if(dk instanceof DedupKey)
-        return name.equals(((DedupKey)dk).name) && val == ((DedupKey)dk).val;
-      else return false;
+      if (dk instanceof DedupKey)
+        return name.equals(((DedupKey) dk).name) && val == ((DedupKey) dk).val;
+      else
+        return false;
     }
   }
-  
+
   static class FixedSizeCache<EntryType> {
     final HashSet<EntryType> hs;
     final Queue<EntryType> toDrop;
     final int maxSize;
-    volatile long dupchunks =0;
+    volatile long dupchunks = 0;
+
     public FixedSizeCache(int size) {
       maxSize = size;
       hs = new HashSet<EntryType>(maxSize);
       toDrop = new ArrayDeque<EntryType>(maxSize);
     }
-    
+
     public synchronized void add(EntryType t) {
-      if(maxSize == 0)
+      if (maxSize == 0)
         return;
-      
-      if(hs.size() >= maxSize) 
-        while(hs.size() >= maxSize) {
+
+      if (hs.size() >= maxSize)
+        while (hs.size() >= maxSize) {
           EntryType td = toDrop.remove();
           hs.remove(td);
         }
-      
+
       hs.add(t);
       toDrop.add(t);
     }
-    
+
     private synchronized boolean addAndCheck(EntryType t) {
-      if(maxSize == 0)
+      if (maxSize == 0)
         return false;
-      
-      boolean b= hs.contains(t);
-      if(b)
+
+      boolean b = hs.contains(t);
+      if (b)
         dupchunks++;
       else {
         hs.add(t);
@@ -86,12 +88,11 @@
       }
       return b;
     }
-    
+
     private long dupCount() {
       return dupchunks;
     }
   }
-  
 
   FixedSizeCache<DedupKey> cache;
   ChukwaWriter next;
@@ -104,11 +105,11 @@
   @Override
   public void add(List<Chunk> chunks) throws WriterException {
     ArrayList<Chunk> passedThrough = new ArrayList<Chunk>();
-    for(Chunk c: chunks)
-      if(! cache.addAndCheck(new DedupKey(c.getStreamName(), c.getSeqID())))
+    for (Chunk c : chunks)
+      if (!cache.addAndCheck(new DedupKey(c.getStreamName(), c.getSeqID())))
         passedThrough.add(c);
-    
-    if(!passedThrough.isEmpty())
+
+    if (!passedThrough.isEmpty())
       next.add(passedThrough);
   }
 

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/InMemoryWriter.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/InMemoryWriter.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/InMemoryWriter.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/InMemoryWriter.java Wed Mar 11 22:39:26 2009
@@ -17,9 +17,9 @@
  */
 package org.apache.hadoop.chukwa.datacollection.writer;
 
+
 import java.io.*;
 import java.util.List;
-
 import org.apache.hadoop.chukwa.Chunk;
 import org.apache.hadoop.chukwa.ChunkImpl;
 import org.apache.hadoop.conf.Configuration;
@@ -64,22 +64,22 @@
    * 
    * @param bytes amount to try to read
    * @param ms time to wait
-  * @return a newly read-in chunk
+   * @return a newly read-in chunk
    * @throws IOException
    */
   public Chunk readOutChunk(int bytes, int ms) throws IOException {
 
     long readStartTime = System.currentTimeMillis();
     try {
-      while(buf.size() < bytes )  {
-        synchronized(this) {
+      while (buf.size() < bytes) {
+        synchronized (this) {
           long timeLeft = ms - System.currentTimeMillis() + readStartTime;
-          if(timeLeft > 0)
-              wait(timeLeft);
+          if (timeLeft > 0)
+            wait(timeLeft);
         }
       }
-      if(dis == null)
-       dis = new DataInputStream( new ByteArrayInputStream(buf.toByteArray()));
+      if (dis == null)
+        dis = new DataInputStream(new ByteArrayInputStream(buf.toByteArray()));
       return ChunkImpl.read(dis);
     } catch (InterruptedException e) {
       Thread.currentThread().interrupt();

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/PipelineStageWriter.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/PipelineStageWriter.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/PipelineStageWriter.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/PipelineStageWriter.java Wed Mar 11 22:39:26 2009
@@ -18,9 +18,9 @@
 
 package org.apache.hadoop.chukwa.datacollection.writer;
 
+
 import java.util.ArrayList;
 import java.util.List;
-
 import org.apache.hadoop.chukwa.Chunk;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.log4j.Logger;
@@ -28,7 +28,7 @@
 public class PipelineStageWriter implements ChukwaWriter {
   Logger log = Logger.getLogger(PipelineStageWriter.class);
 
-  ChukwaWriter writer; //head of pipeline
+  ChukwaWriter writer; // head of pipeline
 
   @Override
   public void add(List<Chunk> chunks) throws WriterException {
@@ -47,45 +47,50 @@
       try {
         String[] classes = pipeline.split(",");
         ArrayList<PipelineableWriter> stages = new ArrayList<PipelineableWriter>();
-        
-        PipelineableWriter lastWriter= null;
-        if(classes.length > 1) {
-          lastWriter = (PipelineableWriter) conf.getClassByName(classes[0]).newInstance();
+
+        PipelineableWriter lastWriter = null;
+        if (classes.length > 1) {
+          lastWriter = (PipelineableWriter) conf.getClassByName(classes[0])
+              .newInstance();
           lastWriter.init(conf);
           writer = lastWriter;
         }
-        
-        for(int i = 1; i < classes.length -1; ++i) {
+
+        for (int i = 1; i < classes.length - 1; ++i) {
           Class stageClass = conf.getClassByName(classes[i]);
           Object st = stageClass.newInstance();
-          if(!(st instanceof PipelineableWriter))
-            log.error("class "+ classes[i]+ " in processing pipeline isn't a pipeline stage");
-              
-          PipelineableWriter stage =  (PipelineableWriter) stageClass.newInstance();
+          if (!(st instanceof PipelineableWriter))
+            log.error("class " + classes[i]
+                + " in processing pipeline isn't a pipeline stage");
+
+          PipelineableWriter stage = (PipelineableWriter) stageClass
+              .newInstance();
           stage.init(conf);
-          //throws exception if types don't match or class not found; this is OK.
-          
+          // throws exception if types don't match or class not found; this is
+          // OK.
+
           lastWriter.setNextStage(stage);
           lastWriter = stage;
         }
-        Class stageClass = conf.getClassByName(classes[classes.length-1]);
+        Class stageClass = conf.getClassByName(classes[classes.length - 1]);
         Object st = stageClass.newInstance();
-        
-        if(!(st instanceof ChukwaWriter)) {
-          log.error("class "+ classes[classes.length-1]+ " at end of processing pipeline isn't a ChukwaWriter");
+
+        if (!(st instanceof ChukwaWriter)) {
+          log.error("class " + classes[classes.length - 1]
+              + " at end of processing pipeline isn't a ChukwaWriter");
           throw new WriterException("bad pipeline");
         } else {
-          if(lastWriter != null)
+          if (lastWriter != null)
             lastWriter.setNextStage((ChukwaWriter) st);
           else
-            writer = (ChukwaWriter) st; //one stage pipeline
+            writer = (ChukwaWriter) st; // one stage pipeline
         }
-        return; 
-      } catch(Exception e) {
-        //if anything went wrong (missing class, etc) we wind up here.
-          log.error("failed to set up pipeline, defaulting to SeqFileWriter",e);
-          //fall through to default case
-          throw new WriterException("bad pipeline");
+        return;
+      } catch (Exception e) {
+        // if anything went wrong (missing class, etc) we wind up here.
+        log.error("failed to set up pipeline, defaulting to SeqFileWriter", e);
+        // fall through to default case
+        throw new WriterException("bad pipeline");
       }
     } else {
       throw new WriterException("must set chukwaCollector.pipeline");

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/PipelineableWriter.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/PipelineableWriter.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/PipelineableWriter.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/PipelineableWriter.java Wed Mar 11 22:39:26 2009
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.chukwa.datacollection.writer;
 
-public interface PipelineableWriter extends ChukwaWriter{
-	public void setNextStage(ChukwaWriter next);
+
+public interface PipelineableWriter extends ChukwaWriter {
+  public void setNextStage(ChukwaWriter next);
 }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/SeqFileWriter.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/SeqFileWriter.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/SeqFileWriter.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/SeqFileWriter.java Wed Mar 11 22:39:26 2009
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.chukwa.datacollection.writer;
 
+
 import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
@@ -25,7 +26,6 @@
 import java.util.List;
 import java.util.Timer;
 import java.util.TimerTask;
-
 import org.apache.hadoop.chukwa.ChukwaArchiveKey;
 import org.apache.hadoop.chukwa.Chunk;
 import org.apache.hadoop.chukwa.ChunkImpl;
@@ -41,384 +41,352 @@
  * this object.
  * 
  */
-public class SeqFileWriter implements ChukwaWriter
-{
-	public static final boolean ENABLE_ROTATION = true;
-	
-	static final int STAT_INTERVAL_SECONDS = 30;
-	static final Object lock = new Object();
-	
-    static Logger log = Logger.getLogger(SeqFileWriter.class);
-  
-	private FileSystem fs = null;
-	private Configuration conf = null;
-
-	private String outputDir = null;
-	private Calendar calendar = Calendar.getInstance();
-
-	private Path currentPath = null;
-	private String currentFileName = null;
-	private FSDataOutputStream currentOutputStr = null;
-	private static SequenceFile.Writer seqFileWriter = null;
-	
-	private static ClientAck clientAck = new ClientAck();
-	private static long nextRotate = 0;
-	private static int rotateInterval = 1000*60;
-	
-	private static Timer clientAckTimer = null;
-	
-	private Timer timer = null;
-
-	private Timer statTimer = null;
-	private volatile long dataSize = 0;
-
-	
-	private int initWriteChunkRetries = 10;
-	private int writeChunkRetries = initWriteChunkRetries;
-	private boolean chunksWrittenThisRotate = false;
-	
-	public SeqFileWriter() throws WriterException
-	{
-	}
-
-	public void init(Configuration conf) throws WriterException
-	{
-		outputDir = conf.get("chukwaCollector.outputDir", "/chukwa");
-
-		rotateInterval = conf.getInt("chukwaCollector.rotateInterval",
-				1000 * 60 * 5);//defaults to 5 minutes
-		nextRotate = System.currentTimeMillis() + rotateInterval;
-		
-		initWriteChunkRetries = conf.getInt("chukwaCollector.writeChunkRetries", 10);
-		writeChunkRetries = initWriteChunkRetries;
-		
-		//check if they've told us the file system to use
-	    String fsname = conf.get("writer.hdfs.filesystem");
-	    if (fsname == null || fsname.equals(""))
-	    {
-	      //otherwise try to get the filesystem from hadoop
-	      fsname = conf.get("fs.default.name");
-	    }
-		
-
-		log.info("rotateInterval is " + rotateInterval);
-		log.info("outputDir is " + outputDir);
-		log.info("fsname is " + fsname);
-		log.info("filesystem type from core-default.xml is "
-				+ conf.get("fs.hdfs.impl"));
-
-		if (fsname == null) {
-			log.error("no filesystem name");
-			throw new WriterException("no filesystem");
-		}	try {
-			fs = FileSystem.get(new URI(fsname), conf);
-			if (fs == null) {
-				log.error("can't connect to HDFS at " + fs.getUri());
-				return;
-			} else
-				log.info("filesystem is " + fs.getUri());
-		} catch (IOException e) {
-			log.error(
-							"can't connect to HDFS, trying default file system instead (likely to be local)",
-							e);
-			try	{
-				fs = FileSystem.get(conf);
-			} catch (IOException err) {
-				log.error("can't connect to default file system either", e);
-			}
-		} catch (URISyntaxException e) 	{
-			log.error("problem generating new URI from config setting");
-			return;
-		}
-
-		// Setup everything by rotating
-		rotate();
-		 
-		clientAckTimer = new Timer();
-		clientAckTimer.schedule(new TimerTask()
-		{
-			public void run() 
-			{
-				synchronized (lock) 
-				{
-					ClientAck previous = clientAck ;
-					SeqFileWriter.clientAck = new ClientAck();
-					
-					try
-					{
-						// SeqFile is uncompressed for now
-						// So we can flush every xx secs
-						// But if we're using block Compression
-						// this is not true anymore
-						// because this will trigger
-						// the compression
-						if (currentOutputStr != null)
-						{
-							currentOutputStr.flush(); 
-						}
-						previous.releaseLock(ClientAck.OK, null);
-						long now = System.currentTimeMillis();
-						if (now >= nextRotate)
-						{
-							nextRotate = System.currentTimeMillis() + rotateInterval;
-							rotate();
-						}
-					}
-					catch(Throwable e)
-					{
-						previous.releaseLock(ClientAck.KO, e);
-						log.warn("Exception when flushing ", e);
-						e.printStackTrace();
-					}	
-				}
-			}
-
-		}, (5*1000), (5*1000));
-		
-		statTimer = new Timer();
-		statTimer.schedule(new StatReportingTask(), 1000, STAT_INTERVAL_SECONDS * 1000);
-		
-		
-		
-	}
-
-	private class StatReportingTask extends TimerTask
-	{
-		private long lastTs = System.currentTimeMillis();
-
-		public void run()
-		{
-			
-		  long time = System.currentTimeMillis();
-			long currentDs = dataSize;
-			dataSize = 0;
-			
-		  long interval = time - lastTs;
-			lastTs = time;
-
-			long dataRate = 1000 * currentDs / interval; // kb/sec
-			log.info("stat:datacollection.writer.hdfs dataSize=" + currentDs + " dataRate=" + dataRate);
-		}
-	};
-
-	
-	void rotate()
-	{
-		calendar.setTimeInMillis(System.currentTimeMillis());
-
-		log.info("start Date [" + calendar.getTime() + "]");
-		log.info("Rotate from " + Thread.currentThread().getName());
-
-		String newName = new java.text.SimpleDateFormat("yyyyddHHmmssSSS").format(calendar.getTime());
-		newName += "_" + new java.rmi.server.UID().toString();
-		newName = newName.replace("-", "");
-		newName = newName.replace(":", "");
-		newName = newName.replace(".", "");
-		newName = outputDir + "/" + newName.trim();
-
-
-		synchronized (lock) 
-		{
-			try
-			{
-				FSDataOutputStream previousOutputStr = currentOutputStr;
-				Path previousPath = currentPath;
-				String previousFileName = currentFileName;
-
-				if (previousOutputStr != null) 	
-				{
-					previousOutputStr.close();
-					if(chunksWrittenThisRotate) {
-					  fs.rename(previousPath, new Path(previousFileName + ".done"));
-					} else {
-					  log.info("no chunks written to "+ previousPath + ", deleting");
-					  fs.delete(previousPath, false);
-					}
-				}
-				Path newOutputPath = new Path(newName + ".chukwa");			
-				FSDataOutputStream newOutputStr = fs.create(newOutputPath);
-				currentOutputStr = newOutputStr;
-				currentPath = newOutputPath;
-				currentFileName = newName;
-				chunksWrittenThisRotate = false;
-				// Uncompressed for now
-				seqFileWriter = SequenceFile.createWriter(conf, newOutputStr,
-						ChukwaArchiveKey.class, ChunkImpl.class,
-						SequenceFile.CompressionType.NONE, null);
-				
-			}
-			catch (IOException e)
-			{
-				log.fatal("IO Exception in rotate. Exiting!");
-				e.printStackTrace();
-				// TODO  
-				// As discussed for now:
-				// Everytime this happen in the past it was because HDFS was down, 
-				// so there's nothing we can do
-				// Shutting down the collector for now
-				// Watchdog will re-start it automatically
-				System.exit(-1);
-			}		
-		}
-
-		log.debug("finished rotate()");
-	}
-
-	// TODO merge the 2 add functions
-	@Override
-	public void add(List<Chunk> chunks) throws WriterException
-	{
-		if (chunks != null) {
-			try {
-		    chunksWrittenThisRotate = true;
-				ChukwaArchiveKey archiveKey = new ChukwaArchiveKey();
-
-				// FIXME compute this once an hour
-				// 
-				synchronized (calendar)
-				{
-					calendar.setTimeInMillis(System.currentTimeMillis());
-					calendar.set(Calendar.MINUTE, 0);
-					calendar.set(Calendar.SECOND, 0);
-					calendar.set(Calendar.MILLISECOND, 0);
-
-					archiveKey.setTimePartition(calendar.getTimeInMillis());
-				}
-
-				ClientAck localClientAck = null;					
-				synchronized(lock)
-				{
-					localClientAck = SeqFileWriter.clientAck;
-					for (Chunk chunk : chunks)
-					{
-						archiveKey.setDataType(chunk.getDataType());
-						archiveKey.setStreamName(chunk.getTags() + "/" + chunk.getSource() + "/" + chunk.getStreamName());
-						archiveKey.setSeqId(chunk.getSeqID());
-
-						if (chunk != null) 	
-						{
-							seqFileWriter.append(archiveKey, chunk);
-							// compute size for stats
-							dataSize += chunk.getData().length;
-						}
-
-					}
-				}// End synchro
-
-				localClientAck.wait4Ack();
-				if (localClientAck.getStatus() != ClientAck.OK)
-				{
-					log.warn("Exception after notyfyAll on the lock - Thread:" + Thread.currentThread().getName(),localClientAck.getException());
-					throw new WriterException(localClientAck.getException());
-				}
-				else
-				{
-					// sucess
-					writeChunkRetries = initWriteChunkRetries;
-				}
-
-			}
-			catch (IOException e) 
-			{
-				writeChunkRetries --;
-				log.error("Could not save the chunk. ", e);
-
-				if (writeChunkRetries < 0)
-				{
-					log.fatal("Too many IOException when trying to write a chunk, Collector is going to exit!");
-					System.exit(-1);
-				}
-				throw new WriterException(e);
-			}
-		}
-
-	}
-	
-	public void add(Chunk chunk) throws WriterException
-	{
-	  
-		if (chunk != null) 	{
-			try {
-				ChukwaArchiveKey archiveKey = new ChukwaArchiveKey();
-
-				// FIXME compute this once an hour
-				synchronized (calendar)
-				{
-					calendar.setTimeInMillis(System.currentTimeMillis());
-					calendar.set(Calendar.MINUTE, 0);
-					calendar.set(Calendar.SECOND, 0);
-					calendar.set(Calendar.MILLISECOND, 0);
-
-					archiveKey.setTimePartition(calendar.getTimeInMillis());
-				}
-
-				archiveKey.setDataType(chunk.getDataType());
-				archiveKey.setStreamName(chunk.getTags() + "/" + chunk.getSource() + "/" + chunk.getStreamName());
-				archiveKey.setSeqId(chunk.getSeqID());
-
-				ClientAck localClientAck = null;
-				synchronized(lock)
-				{
-					localClientAck = SeqFileWriter.clientAck;
-					log.info("[" + Thread.currentThread().getName() + "] Client >>>>>>>>>>>> Current Ack object ===>>>>" + localClientAck.toString());
-					seqFileWriter.append(archiveKey, chunk);
-					
-					// compute size for stats
-					dataSize += chunk.getData().length;
-				}
-				localClientAck.wait4Ack();
-				
-				if (localClientAck.getStatus() != ClientAck.OK)
-				{
-					log.warn("Exception after notyfyAll on the lock - Thread:" + Thread.currentThread().getName(),localClientAck.getException());
-					throw new WriterException(localClientAck.getException());
-				}	
-				else
-				{
-					// sucess
-					writeChunkRetries = initWriteChunkRetries;
-				}
-			} 
-			catch (IOException e) 
-			{
-				writeChunkRetries --;
-				log.error("Could not save the chunk. ", e);
-	
-				if (writeChunkRetries < 0)
-				{
-					log.fatal("Too many IOException when trying to write a chunk, Collector is going to exit!");
-					System.exit(-1);
-				}
-				throw new WriterException(e);
-			}
-		}
-	}
-
-	public void close()
-	{
-		synchronized (lock)
-		{
-		  if (timer != null)
-			  timer.cancel();
-		  if (statTimer != null)
-			  statTimer.cancel();
-		  if (clientAckTimer != null)
-			  clientAckTimer.cancel();
-			try {
-				
-				if (this.currentOutputStr != null)
-				{
-					this.currentOutputStr.close();
-				}
-					
-				clientAck.releaseLock(ClientAck.OK, null);
-				fs.rename(currentPath, new Path(currentFileName + ".done"));
-			} catch (IOException e) 
-			{
-				clientAck.releaseLock(ClientAck.OK, e);
-				log.error("failed to close and rename stream", e);
-			}
-		}
-	}
+public class SeqFileWriter implements ChukwaWriter {
+  public static final boolean ENABLE_ROTATION = true;
+
+  static final int STAT_INTERVAL_SECONDS = 30;
+  static final Object lock = new Object();
+
+  static Logger log = Logger.getLogger(SeqFileWriter.class);
+
+  private FileSystem fs = null;
+  private Configuration conf = null;
+
+  private String outputDir = null;
+  private Calendar calendar = Calendar.getInstance();
+
+  private Path currentPath = null;
+  private String currentFileName = null;
+  private FSDataOutputStream currentOutputStr = null;
+  private static SequenceFile.Writer seqFileWriter = null;
+
+  private static ClientAck clientAck = new ClientAck();
+  private static long nextRotate = 0;
+  private static int rotateInterval = 1000 * 60;
+
+  private static Timer clientAckTimer = null;
+
+  private Timer timer = null;
+
+  private Timer statTimer = null;
+  private volatile long dataSize = 0;
+
+  private int initWriteChunkRetries = 10;
+  private int writeChunkRetries = initWriteChunkRetries;
+  private boolean chunksWrittenThisRotate = false;
+
+  public SeqFileWriter() throws WriterException {
+  }
+
+  public void init(Configuration conf) throws WriterException {
+    outputDir = conf.get("chukwaCollector.outputDir", "/chukwa");
+
+    rotateInterval = conf.getInt("chukwaCollector.rotateInterval",
+        1000 * 60 * 5);// defaults to 5 minutes
+    nextRotate = System.currentTimeMillis() + rotateInterval;
+
+    initWriteChunkRetries = conf
+        .getInt("chukwaCollector.writeChunkRetries", 10);
+    writeChunkRetries = initWriteChunkRetries;
+
+    // check if they've told us the file system to use
+    String fsname = conf.get("writer.hdfs.filesystem");
+    if (fsname == null || fsname.equals("")) {
+      // otherwise try to get the filesystem from hadoop
+      fsname = conf.get("fs.default.name");
+    }
+
+    log.info("rotateInterval is " + rotateInterval);
+    log.info("outputDir is " + outputDir);
+    log.info("fsname is " + fsname);
+    log.info("filesystem type from core-default.xml is "
+        + conf.get("fs.hdfs.impl"));
+
+    if (fsname == null) {
+      log.error("no filesystem name");
+      throw new WriterException("no filesystem");
+    }
+    try {
+      fs = FileSystem.get(new URI(fsname), conf);
+      if (fs == null) {
+        log.error("can't connect to HDFS at " + fs.getUri());
+        return;
+      } else
+        log.info("filesystem is " + fs.getUri());
+    } catch (IOException e) {
+      log
+          .error(
+              "can't connect to HDFS, trying default file system instead (likely to be local)",
+              e);
+      try {
+        fs = FileSystem.get(conf);
+      } catch (IOException err) {
+        log.error("can't connect to default file system either", e);
+      }
+    } catch (URISyntaxException e) {
+      log.error("problem generating new URI from config setting");
+      return;
+    }
+
+    // Setup everything by rotating
+    rotate();
+
+    clientAckTimer = new Timer();
+    clientAckTimer.schedule(new TimerTask() {
+      public void run() {
+        synchronized (lock) {
+          ClientAck previous = clientAck;
+          SeqFileWriter.clientAck = new ClientAck();
+
+          try {
+            // SeqFile is uncompressed for now
+            // So we can flush every xx secs
+            // But if we're using block Compression
+            // this is not true anymore
+            // because this will trigger
+            // the compression
+            if (currentOutputStr != null) {
+              currentOutputStr.flush();
+            }
+            previous.releaseLock(ClientAck.OK, null);
+            long now = System.currentTimeMillis();
+            if (now >= nextRotate) {
+              nextRotate = System.currentTimeMillis() + rotateInterval;
+              rotate();
+            }
+          } catch (Throwable e) {
+            previous.releaseLock(ClientAck.KO, e);
+            log.warn("Exception when flushing ", e);
+            e.printStackTrace();
+          }
+        }
+      }
+
+    }, (5 * 1000), (5 * 1000));
+
+    statTimer = new Timer();
+    statTimer.schedule(new StatReportingTask(), 1000,
+        STAT_INTERVAL_SECONDS * 1000);
+
+  }
+
+  private class StatReportingTask extends TimerTask {
+    private long lastTs = System.currentTimeMillis();
+
+    public void run() {
+
+      long time = System.currentTimeMillis();
+      long currentDs = dataSize;
+      dataSize = 0;
+
+      long interval = time - lastTs;
+      lastTs = time;
+
+      long dataRate = 1000 * currentDs / interval; // kb/sec
+      log.info("stat:datacollection.writer.hdfs dataSize=" + currentDs
+          + " dataRate=" + dataRate);
+    }
+  };
+
+  void rotate() {
+    calendar.setTimeInMillis(System.currentTimeMillis());
+
+    log.info("start Date [" + calendar.getTime() + "]");
+    log.info("Rotate from " + Thread.currentThread().getName());
+
+    String newName = new java.text.SimpleDateFormat("yyyyddHHmmssSSS")
+        .format(calendar.getTime());
+    newName += "_" + new java.rmi.server.UID().toString();
+    newName = newName.replace("-", "");
+    newName = newName.replace(":", "");
+    newName = newName.replace(".", "");
+    newName = outputDir + "/" + newName.trim();
+
+    synchronized (lock) {
+      try {
+        FSDataOutputStream previousOutputStr = currentOutputStr;
+        Path previousPath = currentPath;
+        String previousFileName = currentFileName;
+
+        if (previousOutputStr != null) {
+          previousOutputStr.close();
+          if (chunksWrittenThisRotate) {
+            fs.rename(previousPath, new Path(previousFileName + ".done"));
+          } else {
+            log.info("no chunks written to " + previousPath + ", deleting");
+            fs.delete(previousPath, false);
+          }
+        }
+        Path newOutputPath = new Path(newName + ".chukwa");
+        FSDataOutputStream newOutputStr = fs.create(newOutputPath);
+        currentOutputStr = newOutputStr;
+        currentPath = newOutputPath;
+        currentFileName = newName;
+        chunksWrittenThisRotate = false;
+        // Uncompressed for now
+        seqFileWriter = SequenceFile.createWriter(conf, newOutputStr,
+            ChukwaArchiveKey.class, ChunkImpl.class,
+            SequenceFile.CompressionType.NONE, null);
+
+      } catch (IOException e) {
+        log.fatal("IO Exception in rotate. Exiting!");
+        e.printStackTrace();
+        // TODO
+        // As discussed for now:
+        // Everytime this happen in the past it was because HDFS was down,
+        // so there's nothing we can do
+        // Shutting down the collector for now
+        // Watchdog will re-start it automatically
+        System.exit(-1);
+      }
+    }
+
+    log.debug("finished rotate()");
+  }
+
+  // TODO merge the 2 add functions
+  @Override
+  public void add(List<Chunk> chunks) throws WriterException {
+    if (chunks != null) {
+      try {
+        chunksWrittenThisRotate = true;
+        ChukwaArchiveKey archiveKey = new ChukwaArchiveKey();
+
+        // FIXME compute this once an hour
+        // 
+        synchronized (calendar) {
+          calendar.setTimeInMillis(System.currentTimeMillis());
+          calendar.set(Calendar.MINUTE, 0);
+          calendar.set(Calendar.SECOND, 0);
+          calendar.set(Calendar.MILLISECOND, 0);
+
+          archiveKey.setTimePartition(calendar.getTimeInMillis());
+        }
+
+        ClientAck localClientAck = null;
+        synchronized (lock) {
+          localClientAck = SeqFileWriter.clientAck;
+          for (Chunk chunk : chunks) {
+            archiveKey.setDataType(chunk.getDataType());
+            archiveKey.setStreamName(chunk.getTags() + "/" + chunk.getSource()
+                + "/" + chunk.getStreamName());
+            archiveKey.setSeqId(chunk.getSeqID());
+
+            if (chunk != null) {
+              seqFileWriter.append(archiveKey, chunk);
+              // compute size for stats
+              dataSize += chunk.getData().length;
+            }
+
+          }
+        }// End synchro
+
+        localClientAck.wait4Ack();
+        if (localClientAck.getStatus() != ClientAck.OK) {
+          log
+              .warn("Exception after notyfyAll on the lock - Thread:"
+                  + Thread.currentThread().getName(), localClientAck
+                  .getException());
+          throw new WriterException(localClientAck.getException());
+        } else {
+          // sucess
+          writeChunkRetries = initWriteChunkRetries;
+        }
+
+      } catch (IOException e) {
+        writeChunkRetries--;
+        log.error("Could not save the chunk. ", e);
+
+        if (writeChunkRetries < 0) {
+          log
+              .fatal("Too many IOException when trying to write a chunk, Collector is going to exit!");
+          System.exit(-1);
+        }
+        throw new WriterException(e);
+      }
+    }
+
+  }
+
+  public void add(Chunk chunk) throws WriterException {
+
+    if (chunk != null) {
+      try {
+        ChukwaArchiveKey archiveKey = new ChukwaArchiveKey();
+
+        // FIXME compute this once an hour
+        synchronized (calendar) {
+          calendar.setTimeInMillis(System.currentTimeMillis());
+          calendar.set(Calendar.MINUTE, 0);
+          calendar.set(Calendar.SECOND, 0);
+          calendar.set(Calendar.MILLISECOND, 0);
+
+          archiveKey.setTimePartition(calendar.getTimeInMillis());
+        }
+
+        archiveKey.setDataType(chunk.getDataType());
+        archiveKey.setStreamName(chunk.getTags() + "/" + chunk.getSource()
+            + "/" + chunk.getStreamName());
+        archiveKey.setSeqId(chunk.getSeqID());
+
+        ClientAck localClientAck = null;
+        synchronized (lock) {
+          localClientAck = SeqFileWriter.clientAck;
+          log.info("[" + Thread.currentThread().getName()
+              + "] Client >>>>>>>>>>>> Current Ack object ===>>>>"
+              + localClientAck.toString());
+          seqFileWriter.append(archiveKey, chunk);
+
+          // compute size for stats
+          dataSize += chunk.getData().length;
+        }
+        localClientAck.wait4Ack();
+
+        if (localClientAck.getStatus() != ClientAck.OK) {
+          log
+              .warn("Exception after notyfyAll on the lock - Thread:"
+                  + Thread.currentThread().getName(), localClientAck
+                  .getException());
+          throw new WriterException(localClientAck.getException());
+        } else {
+          // sucess
+          writeChunkRetries = initWriteChunkRetries;
+        }
+      } catch (IOException e) {
+        writeChunkRetries--;
+        log.error("Could not save the chunk. ", e);
+
+        if (writeChunkRetries < 0) {
+          log
+              .fatal("Too many IOException when trying to write a chunk, Collector is going to exit!");
+          System.exit(-1);
+        }
+        throw new WriterException(e);
+      }
+    }
+  }
+
+  public void close() {
+    synchronized (lock) {
+      if (timer != null)
+        timer.cancel();
+      if (statTimer != null)
+        statTimer.cancel();
+      if (clientAckTimer != null)
+        clientAckTimer.cancel();
+      try {
+
+        if (this.currentOutputStr != null) {
+          this.currentOutputStr.close();
+        }
+
+        clientAck.releaseLock(ClientAck.OK, null);
+        fs.rename(currentPath, new Path(currentFileName + ".done"));
+      } catch (IOException e) {
+        clientAck.releaseLock(ClientAck.OK, e);
+        log.error("failed to close and rename stream", e);
+      }
+    }
+  }
 
 }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/WriterException.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/WriterException.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/WriterException.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/WriterException.java Wed Mar 11 22:39:26 2009
@@ -1,29 +1,26 @@
 package org.apache.hadoop.chukwa.datacollection.writer;
 
-public class WriterException extends Exception
-{
 
-	/**
+public class WriterException extends Exception {
+
+  /**
 	 * 
 	 */
-	private static final long serialVersionUID = -4207275200546397145L;
+  private static final long serialVersionUID = -4207275200546397145L;
 
-	public WriterException()
-	{}
+  public WriterException() {
+  }
 
-	public WriterException(String message)
-	{
-		super(message);
-	}
-
-	public WriterException(Throwable cause)
-	{
-		super(cause);
-	}
-
-	public WriterException(String message, Throwable cause)
-	{
-		super(message, cause);
-	}
+  public WriterException(String message) {
+    super(message);
+  }
+
+  public WriterException(Throwable cause) {
+    super(cause);
+  }
+
+  public WriterException(String message, Throwable cause) {
+    super(message, cause);
+  }
 
 }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChuckwaArchiveBuilder.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChuckwaArchiveBuilder.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChuckwaArchiveBuilder.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChuckwaArchiveBuilder.java Wed Mar 11 22:39:26 2009
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.chukwa.extraction.archive;
 
+
 import org.apache.hadoop.chukwa.ChukwaArchiveKey;
 import org.apache.hadoop.chukwa.ChunkImpl;
 import org.apache.hadoop.conf.Configuration;
@@ -33,78 +34,64 @@
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.log4j.Logger;
 
-public class ChuckwaArchiveBuilder extends Configured implements Tool
-{
-	static Logger log = Logger.getLogger(ChuckwaArchiveBuilder.class);
-
-	static int printUsage()
-	{
-		System.out
-				.println("ChuckwaArchiveBuilder <Daily/Hourly> <input> <output>");
-		ToolRunner.printGenericCommandUsage(System.out);
-		return -1;
-	}
-	
-	public int run(String[] args) throws Exception
-	{
-	
-
-		// Make sure there are exactly 3 parameters left.
-		if (args.length != 3)
-		{
-			System.out.println("ERROR: Wrong number of parameters: "
-					+ args.length + " instead of 3.");
-			return printUsage();
-		}
-	
-		JobConf conf = new JobConf(getConf(), ChuckwaArchiveBuilder.class);
-
-		
-		conf.setInputFormat(SequenceFileInputFormat.class);
-		
-		conf.setMapperClass(IdentityMapper.class);
-		conf.setReducerClass(IdentityReducer.class);
-		
-		if (args[0].equalsIgnoreCase("Daily"))
-		{
-			conf.setPartitionerClass(ChukwaArchiveDailyPartitioner.class);
-			conf.setOutputFormat(ChukwaArchiveDailyOutputFormat.class);
-			conf.setJobName("Chukwa-DailyArchiveBuilder");
-		}
-		else if (args[0].equalsIgnoreCase("Hourly"))
-		{
-			conf.setJobName("Chukwa-HourlyArchiveBuilder");
-			conf.setPartitionerClass(ChukwaArchiveHourlyPartitioner.class);
-			conf.setOutputFormat(ChukwaArchiveHourlyOutputFormat.class);			
-		}
-		else
-		{
-			System.out.println("ERROR: Wrong Time partionning: "
-					+ args[0] + " instead of [Hourly/Daily].");
-			return printUsage();
-		}
-
-		
-		conf.setOutputKeyClass(ChukwaArchiveKey.class);
-		conf.setOutputValueClass(ChunkImpl.class);
-				
-		//FIXME need compression - read config
-		//conf.setCompressMapOutput(true);
-		//conf.setMapOutputCompressorClass(LzoCodec.class);
-		
-		//
-		
-		FileInputFormat.setInputPaths(conf, args[1]);
-		FileOutputFormat.setOutputPath(conf, new Path(args[2]));
-
-		JobClient.runJob(conf);
-		return 0;
-	}
-
-	public static void main(String[] args) throws Exception
-	{
-		int res = ToolRunner.run(new Configuration(),
-				new ChuckwaArchiveBuilder(), args);
-		System.exit(res);
-	}
+public class ChuckwaArchiveBuilder extends Configured implements Tool {
+  static Logger log = Logger.getLogger(ChuckwaArchiveBuilder.class);
+
+  static int printUsage() {
+    System.out.println("ChuckwaArchiveBuilder <Daily/Hourly> <input> <output>");
+    ToolRunner.printGenericCommandUsage(System.out);
+    return -1;
+  }
+
+  public int run(String[] args) throws Exception {
+
+    // Make sure there are exactly 3 parameters left.
+    if (args.length != 3) {
+      System.out.println("ERROR: Wrong number of parameters: " + args.length
+          + " instead of 3.");
+      return printUsage();
+    }
+
+    JobConf conf = new JobConf(getConf(), ChuckwaArchiveBuilder.class);
+
+    conf.setInputFormat(SequenceFileInputFormat.class);
+
+    conf.setMapperClass(IdentityMapper.class);
+    conf.setReducerClass(IdentityReducer.class);
+
+    if (args[0].equalsIgnoreCase("Daily")) {
+      conf.setPartitionerClass(ChukwaArchiveDailyPartitioner.class);
+      conf.setOutputFormat(ChukwaArchiveDailyOutputFormat.class);
+      conf.setJobName("Chukwa-DailyArchiveBuilder");
+    } else if (args[0].equalsIgnoreCase("Hourly")) {
+      conf.setJobName("Chukwa-HourlyArchiveBuilder");
+      conf.setPartitionerClass(ChukwaArchiveHourlyPartitioner.class);
+      conf.setOutputFormat(ChukwaArchiveHourlyOutputFormat.class);
+    } else {
+      System.out.println("ERROR: Wrong Time partionning: " + args[0]
+          + " instead of [Hourly/Daily].");
+      return printUsage();
+    }
+
+    conf.setOutputKeyClass(ChukwaArchiveKey.class);
+    conf.setOutputValueClass(ChunkImpl.class);
+
+    // FIXME need compression - read config
+    // conf.setCompressMapOutput(true);
+    // conf.setMapOutputCompressorClass(LzoCodec.class);
+
+    //
+
+    FileInputFormat.setInputPaths(conf, args[1]);
+    FileOutputFormat.setOutputPath(conf, new Path(args[2]));
+
+    JobClient.runJob(conf);
+    return 0;
+  }
+
+  public static void main(String[] args) throws Exception {
+    int res = ToolRunner.run(new Configuration(), new ChuckwaArchiveBuilder(),
+        args);
+    System.exit(res);
+  }
 }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveBuilder.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveBuilder.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveBuilder.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveBuilder.java Wed Mar 11 22:39:26 2009
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.chukwa.extraction.archive;
 
+
 import org.apache.hadoop.chukwa.ChukwaArchiveKey;
 import org.apache.hadoop.chukwa.ChunkImpl;
 import org.apache.hadoop.conf.Configuration;
@@ -34,99 +35,82 @@
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.log4j.Logger;
 
-public class ChukwaArchiveBuilder extends Configured implements Tool
-{
-	static Logger log = Logger.getLogger(ChukwaArchiveBuilder.class);
-
-	static int printUsage()
-	{
-		System.out
-				.println("ChuckwaArchiveBuilder <Stream/DataType/Daily/Hourly> <input> <output>");
-		ToolRunner.printGenericCommandUsage(System.out);
-		return -1;
-	}
-	
-	public int run(String[] args) throws Exception
-	{
-	
-
-		// Make sure there are exactly 3 parameters left.
-		if (args.length != 3)
-		{
-			System.out.println("ERROR: Wrong number of parameters: "
-					+ args.length + " instead of 3.");
-			return printUsage();
-		}
-	
-		JobConf jobConf = new JobConf(getConf(), ChukwaArchiveBuilder.class);
-		jobConf.addResource(new Path("conf/chukwa-demux-conf.xml"));
-		
-		jobConf.setInputFormat(SequenceFileInputFormat.class);
-		
-		jobConf.setMapperClass(IdentityMapper.class);
-		jobConf.setReducerClass(IdentityReducer.class);
-		
-		if (args[0].equalsIgnoreCase("Daily"))
-		{
-			jobConf.setPartitionerClass(ChukwaArchiveDailyPartitioner.class);
-			jobConf.setOutputFormat(ChukwaArchiveDailyOutputFormat.class);
-			jobConf.setJobName("Chukwa-DailyArchiveBuilder");
-		}
-		else if (args[0].equalsIgnoreCase("Hourly"))
-		{
-			jobConf.setJobName("Chukwa-HourlyArchiveBuilder");
-			jobConf.setPartitionerClass(ChukwaArchiveHourlyPartitioner.class);
-			jobConf.setOutputFormat(ChukwaArchiveHourlyOutputFormat.class);			
-		}
-		else if (args[0].equalsIgnoreCase("DataType"))
-    {
+public class ChukwaArchiveBuilder extends Configured implements Tool {
+  static Logger log = Logger.getLogger(ChukwaArchiveBuilder.class);
+
+  static int printUsage() {
+    System.out
+        .println("ChuckwaArchiveBuilder <Stream/DataType/Daily/Hourly> <input> <output>");
+    ToolRunner.printGenericCommandUsage(System.out);
+    return -1;
+  }
+
+  public int run(String[] args) throws Exception {
+
+    // Make sure there are exactly 3 parameters left.
+    if (args.length != 3) {
+      System.out.println("ERROR: Wrong number of parameters: " + args.length
+          + " instead of 3.");
+      return printUsage();
+    }
+
+    JobConf jobConf = new JobConf(getConf(), ChukwaArchiveBuilder.class);
+    jobConf.addResource(new Path("conf/chukwa-demux-conf.xml"));
+
+    jobConf.setInputFormat(SequenceFileInputFormat.class);
+
+    jobConf.setMapperClass(IdentityMapper.class);
+    jobConf.setReducerClass(IdentityReducer.class);
+
+    if (args[0].equalsIgnoreCase("Daily")) {
+      jobConf.setPartitionerClass(ChukwaArchiveDailyPartitioner.class);
+      jobConf.setOutputFormat(ChukwaArchiveDailyOutputFormat.class);
+      jobConf.setJobName("Chukwa-DailyArchiveBuilder");
+    } else if (args[0].equalsIgnoreCase("Hourly")) {
+      jobConf.setJobName("Chukwa-HourlyArchiveBuilder");
+      jobConf.setPartitionerClass(ChukwaArchiveHourlyPartitioner.class);
+      jobConf.setOutputFormat(ChukwaArchiveHourlyOutputFormat.class);
+    } else if (args[0].equalsIgnoreCase("DataType")) {
       jobConf.setJobName("Chukwa-HourlyArchiveBuilder-DataType");
       int reduceCount = jobConf.getInt("chukwaArchiveBuilder.reduceCount", 1);
       log.info("Reduce Count:" + reduceCount);
       jobConf.setNumReduceTasks(reduceCount);
-      
+
       jobConf.setPartitionerClass(ChukwaArchiveDataTypePartitioner.class);
-      jobConf.setOutputFormat(ChukwaArchiveDataTypeOutputFormat.class);     
-    }
-		else if (args[0].equalsIgnoreCase("Stream"))
-    {
+      jobConf.setOutputFormat(ChukwaArchiveDataTypeOutputFormat.class);
+    } else if (args[0].equalsIgnoreCase("Stream")) {
       jobConf.setJobName("Chukwa-HourlyArchiveBuilder-Stream");
       int reduceCount = jobConf.getInt("chukwaArchiveBuilder.reduceCount", 1);
       log.info("Reduce Count:" + reduceCount);
       jobConf.setNumReduceTasks(reduceCount);
-      
+
       jobConf.setPartitionerClass(ChukwaArchiveStreamNamePartitioner.class);
-      jobConf.setOutputFormat(ChukwaArchiveStreamNameOutputFormat.class);     
+      jobConf.setOutputFormat(ChukwaArchiveStreamNameOutputFormat.class);
+    } else {
+      System.out.println("ERROR: Wrong Time partionning: " + args[0]
+          + " instead of [Stream/DataType/Hourly/Daily].");
+      return printUsage();
     }
-		else
-		{
-			System.out.println("ERROR: Wrong Time partionning: "
-					+ args[0] + " instead of [Stream/DataType/Hourly/Daily].");
-			return printUsage();
-		}
-
-    
-		jobConf.set("mapred.compress.map.output", "true");
-		jobConf.set("mapred.map.output.compression.codec", "org.apache.hadoop.io.compress.LzoCodec");
-		jobConf.set("mapred.output.compress", "true");
-		jobConf.set("mapred.output.compression.type", "BLOCK");
-	
-		
-		
-		jobConf.setOutputKeyClass(ChukwaArchiveKey.class);
-		jobConf.setOutputValueClass(ChunkImpl.class);
-	
-		FileInputFormat.setInputPaths(jobConf, args[1]);
-		FileOutputFormat.setOutputPath(jobConf, new Path(args[2]));
-
-		JobClient.runJob(jobConf);
-		return 0;
-	}
-
-	public static void main(String[] args) throws Exception
-	{
-		int res = ToolRunner.run(new Configuration(),
-				new ChukwaArchiveBuilder(), args);
-		System.exit(res);
-	}
+
+    jobConf.set("mapred.compress.map.output", "true");
+    jobConf.set("mapred.map.output.compression.codec",
+        "org.apache.hadoop.io.compress.LzoCodec");
+    jobConf.set("mapred.output.compress", "true");
+    jobConf.set("mapred.output.compression.type", "BLOCK");
+
+    jobConf.setOutputKeyClass(ChukwaArchiveKey.class);
+    jobConf.setOutputValueClass(ChunkImpl.class);
+
+    FileInputFormat.setInputPaths(jobConf, args[1]);
+    FileOutputFormat.setOutputPath(jobConf, new Path(args[2]));
+
+    JobClient.runJob(jobConf);
+    return 0;
+  }
+
+  public static void main(String[] args) throws Exception {
+    int res = ToolRunner.run(new Configuration(), new ChukwaArchiveBuilder(),
+        args);
+    System.exit(res);
+  }
 }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveDailyOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveDailyOutputFormat.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveDailyOutputFormat.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveDailyOutputFormat.java Wed Mar 11 22:39:26 2009
@@ -20,26 +20,25 @@
 
 
 import java.text.SimpleDateFormat;
-
 import org.apache.hadoop.chukwa.ChukwaArchiveKey;
 import org.apache.hadoop.chukwa.ChunkImpl;
 import org.apache.hadoop.mapred.lib.MultipleSequenceFileOutputFormat;
 import org.apache.log4j.Logger;
 
-public class ChukwaArchiveDailyOutputFormat extends MultipleSequenceFileOutputFormat<ChukwaArchiveKey, ChunkImpl>
-{
-	static Logger log = Logger.getLogger(ChukwaArchiveDailyOutputFormat.class);
-	SimpleDateFormat sdf = new SimpleDateFormat("yyyy_MM_dd");
-	
-	
-	@Override
-	protected String generateFileNameForKeyValue(ChukwaArchiveKey key, ChunkImpl chunk,
-			String name)
-	{
-		
-		if (log.isDebugEnabled())
-			{log.debug("ChukwaArchiveOutputFormat.fileName: " + sdf.format(key.getTimePartition()));}
-		
-		return sdf.format(key.getTimePartition()) + ".arc";
-	}
+public class ChukwaArchiveDailyOutputFormat extends
+    MultipleSequenceFileOutputFormat<ChukwaArchiveKey, ChunkImpl> {
+  static Logger log = Logger.getLogger(ChukwaArchiveDailyOutputFormat.class);
+  SimpleDateFormat sdf = new SimpleDateFormat("yyyy_MM_dd");
+
+  @Override
+  protected String generateFileNameForKeyValue(ChukwaArchiveKey key,
+      ChunkImpl chunk, String name) {
+
+    if (log.isDebugEnabled()) {
+      log.debug("ChukwaArchiveOutputFormat.fileName: "
+          + sdf.format(key.getTimePartition()));
+    }
+
+    return sdf.format(key.getTimePartition()) + ".arc";
+  }
 }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveDailyPartitioner.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveDailyPartitioner.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveDailyPartitioner.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveDailyPartitioner.java Wed Mar 11 22:39:26 2009
@@ -18,25 +18,25 @@
 
 package org.apache.hadoop.chukwa.extraction.archive;
 
-import java.text.SimpleDateFormat;
 
+import java.text.SimpleDateFormat;
 import org.apache.hadoop.chukwa.ChukwaArchiveKey;
 import org.apache.hadoop.chukwa.ChunkImpl;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Partitioner;
 
-public class ChukwaArchiveDailyPartitioner<K, V> 
-	implements Partitioner<ChukwaArchiveKey,ChunkImpl>
-{
-	SimpleDateFormat sdf = new SimpleDateFormat("yyyy_MM_dd");
-	
-	public void configure(JobConf arg0)
-	{}
-
-	public int getPartition(ChukwaArchiveKey key,ChunkImpl chunl, int numReduceTasks)
-	{
-		
-		 return (sdf.format(key.getTimePartition()).hashCode() & Integer.MAX_VALUE) % numReduceTasks;
-	}
+public class ChukwaArchiveDailyPartitioner<K, V> implements
+    Partitioner<ChukwaArchiveKey, ChunkImpl> {
+  SimpleDateFormat sdf = new SimpleDateFormat("yyyy_MM_dd");
+
+  public void configure(JobConf arg0) {
+  }
+
+  public int getPartition(ChukwaArchiveKey key, ChunkImpl chunl,
+      int numReduceTasks) {
+
+    return (sdf.format(key.getTimePartition()).hashCode() & Integer.MAX_VALUE)
+        % numReduceTasks;
+  }
 
 }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveDataTypeOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveDataTypeOutputFormat.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveDataTypeOutputFormat.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveDataTypeOutputFormat.java Wed Mar 11 22:39:26 2009
@@ -20,26 +20,26 @@
 
 
 import java.text.SimpleDateFormat;
-
 import org.apache.hadoop.chukwa.ChukwaArchiveKey;
 import org.apache.hadoop.chukwa.ChunkImpl;
 import org.apache.hadoop.mapred.lib.MultipleSequenceFileOutputFormat;
 import org.apache.log4j.Logger;
 
-public class ChukwaArchiveDataTypeOutputFormat extends MultipleSequenceFileOutputFormat<ChukwaArchiveKey, ChunkImpl>
-{
-	static Logger log = Logger.getLogger(ChukwaArchiveDataTypeOutputFormat.class);
-	SimpleDateFormat sdf = new SimpleDateFormat("yyyy_MM_dd");
-	
-	
-	@Override
-	protected String generateFileNameForKeyValue(ChukwaArchiveKey key, ChunkImpl chunk,
-			String name)
-	{
-		
-		if (log.isDebugEnabled())
-			{log.debug("ChukwaArchiveOutputFormat.fileName: " + sdf.format(key.getTimePartition()));}
-		
-		return chunk.getDataType() + "_" +sdf.format(key.getTimePartition()) + ".arc";
-	}
+public class ChukwaArchiveDataTypeOutputFormat extends
+    MultipleSequenceFileOutputFormat<ChukwaArchiveKey, ChunkImpl> {
+  static Logger log = Logger.getLogger(ChukwaArchiveDataTypeOutputFormat.class);
+  SimpleDateFormat sdf = new SimpleDateFormat("yyyy_MM_dd");
+
+  @Override
+  protected String generateFileNameForKeyValue(ChukwaArchiveKey key,
+      ChunkImpl chunk, String name) {
+
+    if (log.isDebugEnabled()) {
+      log.debug("ChukwaArchiveOutputFormat.fileName: "
+          + sdf.format(key.getTimePartition()));
+    }
+
+    return chunk.getDataType() + "_" + sdf.format(key.getTimePartition())
+        + ".arc";
+  }
 }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveDataTypePartitioner.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveDataTypePartitioner.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveDataTypePartitioner.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveDataTypePartitioner.java Wed Mar 11 22:39:26 2009
@@ -18,25 +18,26 @@
 
 package org.apache.hadoop.chukwa.extraction.archive;
 
-import java.text.SimpleDateFormat;
 
+import java.text.SimpleDateFormat;
 import org.apache.hadoop.chukwa.ChukwaArchiveKey;
 import org.apache.hadoop.chukwa.ChunkImpl;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Partitioner;
 
-public class ChukwaArchiveDataTypePartitioner<K, V> 
-	implements Partitioner<ChukwaArchiveKey,ChunkImpl>
-{
-	SimpleDateFormat sdf = new SimpleDateFormat("yyyy_MM_dd");
-	
-	public void configure(JobConf arg0)
-	{}
-
-	public int getPartition(ChukwaArchiveKey key,ChunkImpl chunk, int numReduceTasks)
-	{
-		
-		 return ( (chunk.getDataType() + "_" +sdf.format(key.getTimePartition() )).hashCode() & Integer.MAX_VALUE) % numReduceTasks;
-	}
+public class ChukwaArchiveDataTypePartitioner<K, V> implements
+    Partitioner<ChukwaArchiveKey, ChunkImpl> {
+  SimpleDateFormat sdf = new SimpleDateFormat("yyyy_MM_dd");
+
+  public void configure(JobConf arg0) {
+  }
+
+  public int getPartition(ChukwaArchiveKey key, ChunkImpl chunk,
+      int numReduceTasks) {
+
+    return ((chunk.getDataType() + "_" + sdf.format(key.getTimePartition()))
+        .hashCode() & Integer.MAX_VALUE)
+        % numReduceTasks;
+  }
 
 }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveHourlyOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveHourlyOutputFormat.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveHourlyOutputFormat.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveHourlyOutputFormat.java Wed Mar 11 22:39:26 2009
@@ -20,24 +20,24 @@
 
 
 import java.text.SimpleDateFormat;
-
 import org.apache.hadoop.chukwa.ChukwaArchiveKey;
 import org.apache.hadoop.chukwa.ChunkImpl;
 import org.apache.hadoop.mapred.lib.MultipleSequenceFileOutputFormat;
 import org.apache.log4j.Logger;
 
-public class ChukwaArchiveHourlyOutputFormat extends MultipleSequenceFileOutputFormat<ChukwaArchiveKey, ChunkImpl>
-{
-	static Logger log = Logger.getLogger(ChukwaArchiveHourlyOutputFormat.class);
-	SimpleDateFormat sdf = new SimpleDateFormat("yyyy_MM_dd_HH-00");
-	
-	@Override
-	protected String generateFileNameForKeyValue(ChukwaArchiveKey key, ChunkImpl chunk,
-			String name)
-	{
-		
-		if (log.isDebugEnabled())
-			{log.debug("ChukwaArchiveOutputFormat.fileName: " + sdf.format(key.getTimePartition()));}
-		return sdf.format(key.getTimePartition()) + ".arc";
-	}
+public class ChukwaArchiveHourlyOutputFormat extends
+    MultipleSequenceFileOutputFormat<ChukwaArchiveKey, ChunkImpl> {
+  static Logger log = Logger.getLogger(ChukwaArchiveHourlyOutputFormat.class);
+  SimpleDateFormat sdf = new SimpleDateFormat("yyyy_MM_dd_HH-00");
+
+  @Override
+  protected String generateFileNameForKeyValue(ChukwaArchiveKey key,
+      ChunkImpl chunk, String name) {
+
+    if (log.isDebugEnabled()) {
+      log.debug("ChukwaArchiveOutputFormat.fileName: "
+          + sdf.format(key.getTimePartition()));
+    }
+    return sdf.format(key.getTimePartition()) + ".arc";
+  }
 }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveHourlyPartitioner.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveHourlyPartitioner.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveHourlyPartitioner.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveHourlyPartitioner.java Wed Mar 11 22:39:26 2009
@@ -18,25 +18,25 @@
 
 package org.apache.hadoop.chukwa.extraction.archive;
 
-import java.text.SimpleDateFormat;
 
+import java.text.SimpleDateFormat;
 import org.apache.hadoop.chukwa.ChukwaArchiveKey;
 import org.apache.hadoop.chukwa.ChunkImpl;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Partitioner;
 
-public class ChukwaArchiveHourlyPartitioner<K, V> 
-	implements Partitioner<ChukwaArchiveKey,ChunkImpl>
-{
-	SimpleDateFormat sdf = new SimpleDateFormat("yyyy_MM_dd_HH-00");
-	
-	public void configure(JobConf arg0)
-	{}
-
-	public int getPartition(ChukwaArchiveKey key,ChunkImpl chunl, int numReduceTasks)
-	{
-		
-		 return (sdf.format(key.getTimePartition()).hashCode() & Integer.MAX_VALUE) % numReduceTasks;
-	}
+public class ChukwaArchiveHourlyPartitioner<K, V> implements
+    Partitioner<ChukwaArchiveKey, ChunkImpl> {
+  SimpleDateFormat sdf = new SimpleDateFormat("yyyy_MM_dd_HH-00");
+
+  public void configure(JobConf arg0) {
+  }
+
+  public int getPartition(ChukwaArchiveKey key, ChunkImpl chunl,
+      int numReduceTasks) {
+
+    return (sdf.format(key.getTimePartition()).hashCode() & Integer.MAX_VALUE)
+        % numReduceTasks;
+  }
 
 }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveMerger.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveMerger.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveMerger.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveMerger.java Wed Mar 11 22:39:26 2009
@@ -1,6 +1,6 @@
 package org.apache.hadoop.chukwa.extraction.archive;
 
-public class ChukwaArchiveMerger
-{
+
+public class ChukwaArchiveMerger {
 
 }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveStreamNameOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveStreamNameOutputFormat.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveStreamNameOutputFormat.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveStreamNameOutputFormat.java Wed Mar 11 22:39:26 2009
@@ -23,13 +23,12 @@
 import org.apache.hadoop.chukwa.ChunkImpl;
 import org.apache.hadoop.mapred.lib.MultipleSequenceFileOutputFormat;
 
-public class ChukwaArchiveStreamNameOutputFormat extends MultipleSequenceFileOutputFormat<ChukwaArchiveKey, ChunkImpl>
-{
+public class ChukwaArchiveStreamNameOutputFormat extends
+    MultipleSequenceFileOutputFormat<ChukwaArchiveKey, ChunkImpl> {
 
   @Override
-  protected String generateLeafFileName(String name)
-  {
+  protected String generateLeafFileName(String name) {
     return "chukwaArchive-" + super.generateLeafFileName(name);
   }
-  
+
 }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveStreamNamePartitioner.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveStreamNamePartitioner.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveStreamNamePartitioner.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveStreamNamePartitioner.java Wed Mar 11 22:39:26 2009
@@ -18,21 +18,22 @@
 
 package org.apache.hadoop.chukwa.extraction.archive;
 
+
 import org.apache.hadoop.chukwa.ChukwaArchiveKey;
 import org.apache.hadoop.chukwa.ChunkImpl;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Partitioner;
 
-public class ChukwaArchiveStreamNamePartitioner<K, V> 
-	implements Partitioner<ChukwaArchiveKey,ChunkImpl>
-{
-	public void configure(JobConf arg0)
-	{}
-
-	public int getPartition(ChukwaArchiveKey key,ChunkImpl chunk, int numReduceTasks)
-	{
-		
-		 return ( (chunk.getSource() + "/" + chunk.getStreamName()).hashCode() & Integer.MAX_VALUE) % numReduceTasks;
-	}
+public class ChukwaArchiveStreamNamePartitioner<K, V> implements
+    Partitioner<ChukwaArchiveKey, ChunkImpl> {
+  public void configure(JobConf arg0) {
+  }
+
+  public int getPartition(ChukwaArchiveKey key, ChunkImpl chunk,
+      int numReduceTasks) {
+
+    return ((chunk.getSource() + "/" + chunk.getStreamName()).hashCode() & Integer.MAX_VALUE)
+        % numReduceTasks;
+  }
 
 }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/database/DBException.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/database/DBException.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/database/DBException.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/database/DBException.java Wed Mar 11 22:39:26 2009
@@ -1,30 +1,26 @@
 package org.apache.hadoop.chukwa.extraction.database;
 
-public class DBException extends Exception
-{
 
-	/**
+public class DBException extends Exception {
+
+  /**
 	 * 
 	 */
-	private static final long serialVersionUID = -4509384580029389936L;
+  private static final long serialVersionUID = -4509384580029389936L;
+
+  public DBException() {
+  }
 
-	public DBException()
-	{
-	}
-
-	public DBException(String message)
-	{
-		super(message);
-	}
-
-	public DBException(Throwable cause)
-	{
-		super(cause);
-	}
-
-	public DBException(String message, Throwable cause)
-	{
-		super(message, cause);
-	}
+  public DBException(String message) {
+    super(message);
+  }
+
+  public DBException(Throwable cause) {
+    super(cause);
+  }
+
+  public DBException(String message, Throwable cause) {
+    super(message, cause);
+  }
 
 }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/database/DBPlugin.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/database/DBPlugin.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/database/DBPlugin.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/database/DBPlugin.java Wed Mar 11 22:39:26 2009
@@ -1,9 +1,8 @@
 package org.apache.hadoop.chukwa.extraction.database;
 
+
 import org.apache.hadoop.io.SequenceFile;
 
-public interface DBPlugin
-{
-	void process(SequenceFile.Reader reader)
-	throws DBException;
+public interface DBPlugin {
+  void process(SequenceFile.Reader reader) throws DBException;
 }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/database/DatabaseLoader.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/database/DatabaseLoader.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/database/DatabaseLoader.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/database/DatabaseLoader.java Wed Mar 11 22:39:26 2009
@@ -1,10 +1,10 @@
 package org.apache.hadoop.chukwa.extraction.database;
 
+
 import java.io.IOException;
 import java.net.URISyntaxException;
 import java.sql.SQLException;
 import java.util.HashMap;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
@@ -13,125 +13,105 @@
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 
-public class DatabaseLoader
-{
+public class DatabaseLoader {
 
-	static HashMap<String, String> hashDatasources = new HashMap<String, String>();
-	static ChukwaConfiguration conf = null;
-	static FileSystem fs = null;
-	
-	private static Log log = LogFactory.getLog(DatabaseLoader.class);
-
-	/**
-	 * @param args
-	 * @throws URISyntaxException
-	 * @throws IOException
-	 */
-	public static void main(String[] args) throws IOException,
-			URISyntaxException
-	{
-		//FIXME quick implementation to be able to load data into database
-		
-		System.out.println("Input directory:" + args[0]);
-
-		
-		for(int i=1;i<args.length;i++)
-		{
-			hashDatasources.put(args[i], "");
-		}
-		
-		conf = new ChukwaConfiguration();
-		fs = FileSystem.get(conf);
-		Path demuxDir = new Path(args[0]);
-		FileStatus fstat = fs.getFileStatus(demuxDir);
-
-		if (!fstat.isDir())
-		{
-			throw new IOException(args[0] + " is not a directory!");
-		} 
-		else
-		{
-			// cluster Directory
-			FileStatus[] clusterDirectories = fs.listStatus(demuxDir);
-			for (FileStatus clusterDirectory : clusterDirectories)
-			{
-				FileStatus[] datasourceDirectories = fs.listStatus(clusterDirectory.getPath());
-				
-				String directoryName = null;
-				for (FileStatus datasourceDirectory : datasourceDirectories)
-				{
-					directoryName = datasourceDirectory.getPath().getName();
-					if (directoryName.equals("_log") || (!hashDatasources.containsKey(directoryName)))
-					{
-						log.info("Skipping this directory:" + directoryName );
-						continue;
-					}
-					try
-					{
-						processDS(clusterDirectory.getPath().getName(),datasourceDirectory.getPath());
-					}
-					catch(Exception e)
-					{
-						e.printStackTrace();
-						log.warn("Exception in DatabaseLoader:" ,e);
-					}
-				}
-			}
-			
-			System.exit(0);
-		}
-	}
-		
-	static void processDS(String cluster, Path datasourcePath) throws IOException
-	{
-		Path srcDir = datasourcePath;
-		FileStatus fstat = fs.getFileStatus(srcDir);
-
-		if (!fstat.isDir())
-		{
-			throw new IOException(datasourcePath.getName() + " is not a directory!");
-		} else
-		{
-			FileStatus[] datasourceDirectories = fs.listStatus(srcDir,new EventFileFilter());
-			for (FileStatus datasourceDirectory : datasourceDirectories)
-			{
-				String dataSource = datasourceDirectory.getPath().getName();
-				dataSource = dataSource.substring(0,dataSource.indexOf('_'));
-				
-				
-				// Need to rename if we want todo some processing in para.
-				//
-				// Maybe the file has already been processed by another loader
-				if (fs.exists(datasourceDirectory.getPath()))
-				{
-					
-					log.info("Processing: " + datasourceDirectory.getPath().getName());
-					
-					try
-					{
-						MetricDataLoader mdl = new MetricDataLoader(cluster);
-						mdl.process(datasourceDirectory.getPath());
-					} catch (SQLException e)
-					{
-						e.printStackTrace();
-						log.warn("SQLException in MetricDataLoader:" ,e);
-					} catch (URISyntaxException e)
-					{
-						e.printStackTrace();
-						log.warn("Exception in MetricDataLoader:" ,e);
-					}
-					
-					log.info("Processed: " + datasourceDirectory.getPath().getName());
-				}
-			} // End for(FileStatus datasourceDirectory :datasourceDirectories)
-		} // End Else
-	}
+  static HashMap<String, String> hashDatasources = new HashMap<String, String>();
+  static ChukwaConfiguration conf = null;
+  static FileSystem fs = null;
+
+  private static Log log = LogFactory.getLog(DatabaseLoader.class);
+
+  /**
+   * @param args
+   * @throws URISyntaxException
+   * @throws IOException
+   */
+  public static void main(String[] args) throws IOException, URISyntaxException {
+    // FIXME quick implementation to be able to load data into database
+
+    System.out.println("Input directory:" + args[0]);
+
+    for (int i = 1; i < args.length; i++) {
+      hashDatasources.put(args[i], "");
+    }
+
+    conf = new ChukwaConfiguration();
+    fs = FileSystem.get(conf);
+    Path demuxDir = new Path(args[0]);
+    FileStatus fstat = fs.getFileStatus(demuxDir);
+
+    if (!fstat.isDir()) {
+      throw new IOException(args[0] + " is not a directory!");
+    } else {
+      // cluster Directory
+      FileStatus[] clusterDirectories = fs.listStatus(demuxDir);
+      for (FileStatus clusterDirectory : clusterDirectories) {
+        FileStatus[] datasourceDirectories = fs.listStatus(clusterDirectory
+            .getPath());
+
+        String directoryName = null;
+        for (FileStatus datasourceDirectory : datasourceDirectories) {
+          directoryName = datasourceDirectory.getPath().getName();
+          if (directoryName.equals("_log")
+              || (!hashDatasources.containsKey(directoryName))) {
+            log.info("Skipping this directory:" + directoryName);
+            continue;
+          }
+          try {
+            processDS(clusterDirectory.getPath().getName(), datasourceDirectory
+                .getPath());
+          } catch (Exception e) {
+            e.printStackTrace();
+            log.warn("Exception in DatabaseLoader:", e);
+          }
+        }
+      }
+
+      System.exit(0);
+    }
+  }
+
+  static void processDS(String cluster, Path datasourcePath) throws IOException {
+    Path srcDir = datasourcePath;
+    FileStatus fstat = fs.getFileStatus(srcDir);
+
+    if (!fstat.isDir()) {
+      throw new IOException(datasourcePath.getName() + " is not a directory!");
+    } else {
+      FileStatus[] datasourceDirectories = fs.listStatus(srcDir,
+          new EventFileFilter());
+      for (FileStatus datasourceDirectory : datasourceDirectories) {
+        String dataSource = datasourceDirectory.getPath().getName();
+        dataSource = dataSource.substring(0, dataSource.indexOf('_'));
+
+        // Need to rename if we want todo some processing in para.
+        //
+        // Maybe the file has already been processed by another loader
+        if (fs.exists(datasourceDirectory.getPath())) {
+
+          log.info("Processing: " + datasourceDirectory.getPath().getName());
+
+          try {
+            MetricDataLoader mdl = new MetricDataLoader(cluster);
+            mdl.process(datasourceDirectory.getPath());
+          } catch (SQLException e) {
+            e.printStackTrace();
+            log.warn("SQLException in MetricDataLoader:", e);
+          } catch (URISyntaxException e) {
+            e.printStackTrace();
+            log.warn("Exception in MetricDataLoader:", e);
+          }
+
+          log.info("Processed: " + datasourceDirectory.getPath().getName());
+        }
+      } // End for(FileStatus datasourceDirectory :datasourceDirectories)
+    } // End Else
+  }
 }
 
-class EventFileFilter implements PathFilter
-{
-	  public boolean accept(Path path) 
-	  {
-	    return (path.toString().endsWith(".evt"));
-	  }
+
+class EventFileFilter implements PathFilter {
+  public boolean accept(Path path) {
+    return (path.toString().endsWith(".evt"));
+  }
 }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/database/MRJobCounters.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/database/MRJobCounters.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/database/MRJobCounters.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/database/MRJobCounters.java Wed Mar 11 22:39:26 2009
@@ -1,74 +1,74 @@
 package org.apache.hadoop.chukwa.extraction.database;
 
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
 import org.apache.hadoop.io.SequenceFile.Reader;
 
-public class MRJobCounters implements DBPlugin
-{
-	private static Log log = LogFactory.getLog(MRJobCounters.class);
-	
-	static final String[] fields = 
-		{"FILE_SYSTEMS_HDFS_BYTES_READ","FILE_SYSTEMS_HDFS_BYTES_WRITTEN",
-		"FILE_SYSTEMS_LOCAL_BYTES_READ","FILE_SYSTEMS_LOCAL_BYTES_WRITTEN","HodId",
-		"JOB_COUNTERS__DATA-LOCAL_MAP_TASKS","JOB_COUNTERS__LAUNCHED_MAP_TASKS",
-		"JOB_COUNTERS__LAUNCHED_REDUCE_TASKS","JOB_COUNTERS__RACK-LOCAL_MAP_TASKS",
-		"JobId","MAP-REDUCE_FRAMEWORK_COMBINE_INPUT_RECORDS","MAP-REDUCE_FRAMEWORK_COMBINE_OUTPUT_RECORDS",
-		"MAP-REDUCE_FRAMEWORK_MAP_INPUT_BYTES","MAP-REDUCE_FRAMEWORK_MAP_INPUT_RECORDS",
-		"MAP-REDUCE_FRAMEWORK_MAP_OUTPUT_BYTES","MAP-REDUCE_FRAMEWORK_MAP_OUTPUT_RECORDS",
-		"MAP-REDUCE_FRAMEWORK_MAP_OUTPUT_BYTES","MAP-REDUCE_FRAMEWORK_MAP_OUTPUT_RECORDS",
-		"MAP-REDUCE_FRAMEWORK_REDUCE_INPUT_GROUPS","MAP-REDUCE_FRAMEWORK_REDUCE_INPUT_RECORDS",
-		"MAP-REDUCE_FRAMEWORK_REDUCE_OUTPUT_RECORDS"};
-//	[FILE_SYSTEMS_HDFS_BYTES_READ] :801280331655
-//	[FILE_SYSTEMS_HDFS_BYTES_WRITTEN] :44142889
-//	[FILE_SYSTEMS_LOCAL_BYTES_READ] :1735570776310
-//	[FILE_SYSTEMS_LOCAL_BYTES_WRITTEN] :2610893176016
-//	[HodId] :0.0
-//	[JOB_COUNTERS__DATA-LOCAL_MAP_TASKS] :5545
-//	[JOB_COUNTERS__LAUNCHED_MAP_TASKS] :5912
-//	[JOB_COUNTERS__LAUNCHED_REDUCE_TASKS] :739
-//	[JOB_COUNTERS__RACK-LOCAL_MAP_TASKS] :346
-//	[JobId] :2.008042104030008E15
-//	[MAP-REDUCE_FRAMEWORK_COMBINE_INPUT_RECORDS] :0
-//	[MAP-REDUCE_FRAMEWORK_COMBINE_OUTPUT_RECORDS] :0
-//	[MAP-REDUCE_FRAMEWORK_MAP_INPUT_BYTES] :801273929542
-//	[MAP-REDUCE_FRAMEWORK_MAP_INPUT_RECORDS] :9406887059
-//	[MAP-REDUCE_FRAMEWORK_MAP_OUTPUT_BYTES] :784109666437
-//	[MAP-REDUCE_FRAMEWORK_MAP_OUTPUT_RECORDS] :9406887059
-//	[MAP-REDUCE_FRAMEWORK_REDUCE_INPUT_GROUPS] :477623
-//	[MAP-REDUCE_FRAMEWORK_REDUCE_INPUT_RECORDS] :739000
-//	[MAP-REDUCE_FRAMEWORK_REDUCE_OUTPUT_RECORDS] :739000
-
-	
-	@Override
-	public void process(Reader reader) throws DBException
-	{
-		ChukwaRecordKey key = new ChukwaRecordKey();
-		ChukwaRecord record = new ChukwaRecord();
-		try
-		{
-			StringBuilder sb = new StringBuilder();
-			while (reader.next(key, record))
-			{
-				
-				sb.append("insert into MRJobCounters ");
-				for (String field :fields)
-				{
-					sb.append(" set ").append(field).append(" = ").append(record.getValue(field)).append(", ");
-				}
-				sb.append(" set timestamp =").append( record.getTime()).append(";\n");
-			} 
-			System.out.println(sb.toString());
-		} 
-		catch (Exception e)
-		{
-			log.error("Unable to insert data into database"
-					+ e.getMessage());
-			e.printStackTrace();
-		} 
+public class MRJobCounters implements DBPlugin {
+  private static Log log = LogFactory.getLog(MRJobCounters.class);
+
+  static final String[] fields = { "FILE_SYSTEMS_HDFS_BYTES_READ",
+      "FILE_SYSTEMS_HDFS_BYTES_WRITTEN", "FILE_SYSTEMS_LOCAL_BYTES_READ",
+      "FILE_SYSTEMS_LOCAL_BYTES_WRITTEN", "HodId",
+      "JOB_COUNTERS__DATA-LOCAL_MAP_TASKS", "JOB_COUNTERS__LAUNCHED_MAP_TASKS",
+      "JOB_COUNTERS__LAUNCHED_REDUCE_TASKS",
+      "JOB_COUNTERS__RACK-LOCAL_MAP_TASKS", "JobId",
+      "MAP-REDUCE_FRAMEWORK_COMBINE_INPUT_RECORDS",
+      "MAP-REDUCE_FRAMEWORK_COMBINE_OUTPUT_RECORDS",
+      "MAP-REDUCE_FRAMEWORK_MAP_INPUT_BYTES",
+      "MAP-REDUCE_FRAMEWORK_MAP_INPUT_RECORDS",
+      "MAP-REDUCE_FRAMEWORK_MAP_OUTPUT_BYTES",
+      "MAP-REDUCE_FRAMEWORK_MAP_OUTPUT_RECORDS",
+      "MAP-REDUCE_FRAMEWORK_MAP_OUTPUT_BYTES",
+      "MAP-REDUCE_FRAMEWORK_MAP_OUTPUT_RECORDS",
+      "MAP-REDUCE_FRAMEWORK_REDUCE_INPUT_GROUPS",
+      "MAP-REDUCE_FRAMEWORK_REDUCE_INPUT_RECORDS",
+      "MAP-REDUCE_FRAMEWORK_REDUCE_OUTPUT_RECORDS" };
+
+  // [FILE_SYSTEMS_HDFS_BYTES_READ] :801280331655
+  // [FILE_SYSTEMS_HDFS_BYTES_WRITTEN] :44142889
+  // [FILE_SYSTEMS_LOCAL_BYTES_READ] :1735570776310
+  // [FILE_SYSTEMS_LOCAL_BYTES_WRITTEN] :2610893176016
+  // [HodId] :0.0
+  // [JOB_COUNTERS__DATA-LOCAL_MAP_TASKS] :5545
+  // [JOB_COUNTERS__LAUNCHED_MAP_TASKS] :5912
+  // [JOB_COUNTERS__LAUNCHED_REDUCE_TASKS] :739
+  // [JOB_COUNTERS__RACK-LOCAL_MAP_TASKS] :346
+  // [JobId] :2.008042104030008E15
+  // [MAP-REDUCE_FRAMEWORK_COMBINE_INPUT_RECORDS] :0
+  // [MAP-REDUCE_FRAMEWORK_COMBINE_OUTPUT_RECORDS] :0
+  // [MAP-REDUCE_FRAMEWORK_MAP_INPUT_BYTES] :801273929542
+  // [MAP-REDUCE_FRAMEWORK_MAP_INPUT_RECORDS] :9406887059
+  // [MAP-REDUCE_FRAMEWORK_MAP_OUTPUT_BYTES] :784109666437
+  // [MAP-REDUCE_FRAMEWORK_MAP_OUTPUT_RECORDS] :9406887059
+  // [MAP-REDUCE_FRAMEWORK_REDUCE_INPUT_GROUPS] :477623
+  // [MAP-REDUCE_FRAMEWORK_REDUCE_INPUT_RECORDS] :739000
+  // [MAP-REDUCE_FRAMEWORK_REDUCE_OUTPUT_RECORDS] :739000
+
+  @Override
+  public void process(Reader reader) throws DBException {
+    ChukwaRecordKey key = new ChukwaRecordKey();
+    ChukwaRecord record = new ChukwaRecord();
+    try {
+      StringBuilder sb = new StringBuilder();
+      while (reader.next(key, record)) {
+
+        sb.append("insert into MRJobCounters ");
+        for (String field : fields) {
+          sb.append(" set ").append(field).append(" = ").append(
+              record.getValue(field)).append(", ");
+        }
+        sb.append(" set timestamp =").append(record.getTime()).append(";\n");
+      }
+      System.out.println(sb.toString());
+    } catch (Exception e) {
+      log.error("Unable to insert data into database" + e.getMessage());
+      e.printStackTrace();
+    }
 
-	}
+  }
 
 }