You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@chukwa.apache.org by as...@apache.org on 2009/12/31 17:10:12 UTC

svn commit: r894877 - in /hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection: test/FilePerPostWriter.java writer/SeqFileWriter.java

Author: asrabkin
Date: Thu Dec 31 16:10:11 2009
New Revision: 894877

URL: http://svn.apache.org/viewvc?rev=894877&view=rev
Log:
CHUKWA-433. Revisions

Modified:
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/test/FilePerPostWriter.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/SeqFileWriter.java

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/test/FilePerPostWriter.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/test/FilePerPostWriter.java?rev=894877&r1=894876&r2=894877&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/test/FilePerPostWriter.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/test/FilePerPostWriter.java Thu Dec 31 16:10:11 2009
@@ -21,7 +21,9 @@
 import java.net.URI;
 
 
+import java.util.Calendar;
 import java.util.List;
+import java.util.Timer;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.hadoop.chukwa.ChukwaArchiveKey;
@@ -29,6 +31,7 @@
 import org.apache.hadoop.chukwa.ChunkImpl;
 import org.apache.hadoop.chukwa.datacollection.writer.*;
 import org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter.CommitStatus;
+import org.apache.hadoop.chukwa.datacollection.writer.SeqFileWriter.StatReportingTask;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -46,22 +49,54 @@
   String baseName;
   AtomicLong counter = new AtomicLong(0);
   
+  protected FileSystem fs = null;
+  protected Configuration conf = null;
+
+  protected String outputDir = null;
+//  private Calendar calendar = Calendar.getInstance();
+
+  protected Path currentPath = null;
+  protected String currentFileName = null;
+
+  
   @Override
-  public CommitStatus add(List<Chunk> chunks) throws WriterException {
+  public synchronized CommitStatus add(List<Chunk> chunks) throws WriterException {
+
     try {
       String newName = baseName +"_" +counter.incrementAndGet();
       Path newOutputPath = new Path(newName + ".done");
-      FSDataOutputStream newOutputStr = fs.create(newOutputPath);
-      currentOutputStr = newOutputStr;
+      FSDataOutputStream currentOutputStr = fs.create(newOutputPath);
       currentPath = newOutputPath;
       currentFileName = newName;
       // Uncompressed for now
-      seqFileWriter = SequenceFile.createWriter(conf, newOutputStr,
+      SequenceFile.Writer seqFileWriter = SequenceFile.createWriter(conf, currentOutputStr,
           ChukwaArchiveKey.class, ChunkImpl.class,
           SequenceFile.CompressionType.NONE, null);
     
-      super.add(chunks);
+      ChukwaArchiveKey archiveKey = new ChukwaArchiveKey();
+      
+      if (System.currentTimeMillis() >= nextTimePeriodComputation) {
+        computeTimePeriod();
+      }
+
+      for (Chunk chunk : chunks) {
+        archiveKey.setTimePartition(timePeriod);
+        archiveKey.setDataType(chunk.getDataType());
+        archiveKey.setStreamName(chunk.getTags() + "/" + chunk.getSource()
+            + "/" + chunk.getStreamName());
+        archiveKey.setSeqId(chunk.getSeqID());
+
+        if (chunk != null) {
+          // compute size for stats
+          dataSize += chunk.getData().length;
+          bytesThisRotate += chunk.getData().length;
+          seqFileWriter.append(archiveKey, chunk);
+        }
+
+      }
+      
       seqFileWriter.close();
+      currentOutputStr.close();
     } catch(IOException e) {
       throw new WriterException(e);
     }
@@ -87,6 +122,13 @@
 
       fs = FileSystem.get(new URI(fsname), conf);
       isRunning = true;
+      
+      statTimer = new Timer();
+      statTimer.schedule(new StatReportingTask(), 1000,
+          STAT_INTERVAL_SECONDS * 1000);
+
+
+      nextTimePeriodComputation = 0;
     } catch(Exception e) {
       throw new WriterException(e);
     }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/SeqFileWriter.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/SeqFileWriter.java?rev=894877&r1=894876&r2=894877&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/SeqFileWriter.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/SeqFileWriter.java Thu Dec 31 16:10:11 2009
@@ -48,7 +48,7 @@
   static Logger log = Logger.getLogger(SeqFileWriter.class);
   public static boolean ENABLE_ROTATION_ON_CLOSE = true;
 
-  int STAT_INTERVAL_SECONDS = 30;
+  protected int STAT_INTERVAL_SECONDS = 30;
   private int rotateInterval = 1000 * 60 * 5;
   
   public static final String STAT_PERIOD_OPT = "chukwaCollector.stats.period";
@@ -69,14 +69,14 @@
   protected FSDataOutputStream currentOutputStr = null;
   protected SequenceFile.Writer seqFileWriter = null;
 
-  private long timePeriod = -1;
-  private long nextTimePeriodComputation = -1;
+  protected long timePeriod = -1;
+  protected long nextTimePeriodComputation = -1;
   
   protected Timer rotateTimer = null;  
   protected Timer statTimer = null;
   
-  private volatile long dataSize = 0;
-  private volatile long bytesThisRotate = 0;
+  protected volatile long dataSize = 0;
+  protected volatile long bytesThisRotate = 0;
   protected volatile boolean isRunning = false;
   
   static {
@@ -143,7 +143,7 @@
 
   }
 
-  private class StatReportingTask extends TimerTask {
+  public class StatReportingTask extends TimerTask {
     private long lastTs = System.currentTimeMillis();
 
     public void run() {
@@ -159,6 +159,8 @@
       log.info("stat:datacollection.writer.hdfs dataSize=" + currentDs
           + " dataRate=" + dataRate);
     }
+    
+    public StatReportingTask() {}
   };
 
   void rotate() {