You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@chukwa.apache.org by as...@apache.org on 2009/12/31 17:10:12 UTC
svn commit: r894877 - in
/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection:
test/FilePerPostWriter.java writer/SeqFileWriter.java
Author: asrabkin
Date: Thu Dec 31 16:10:11 2009
New Revision: 894877
URL: http://svn.apache.org/viewvc?rev=894877&view=rev
Log:
CHUKWA-433. Revisions
Modified:
hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/test/FilePerPostWriter.java
hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/SeqFileWriter.java
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/test/FilePerPostWriter.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/test/FilePerPostWriter.java?rev=894877&r1=894876&r2=894877&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/test/FilePerPostWriter.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/test/FilePerPostWriter.java Thu Dec 31 16:10:11 2009
@@ -21,7 +21,9 @@
import java.net.URI;
+import java.util.Calendar;
import java.util.List;
+import java.util.Timer;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.chukwa.ChukwaArchiveKey;
@@ -29,6 +31,7 @@
import org.apache.hadoop.chukwa.ChunkImpl;
import org.apache.hadoop.chukwa.datacollection.writer.*;
import org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter.CommitStatus;
+import org.apache.hadoop.chukwa.datacollection.writer.SeqFileWriter.StatReportingTask;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
@@ -46,22 +49,54 @@
String baseName;
AtomicLong counter = new AtomicLong(0);
+ protected FileSystem fs = null;
+ protected Configuration conf = null;
+
+ protected String outputDir = null;
+// private Calendar calendar = Calendar.getInstance();
+
+ protected Path currentPath = null;
+ protected String currentFileName = null;
+
+
@Override
- public CommitStatus add(List<Chunk> chunks) throws WriterException {
+ public synchronized CommitStatus add(List<Chunk> chunks) throws WriterException {
+
try {
String newName = baseName +"_" +counter.incrementAndGet();
Path newOutputPath = new Path(newName + ".done");
- FSDataOutputStream newOutputStr = fs.create(newOutputPath);
- currentOutputStr = newOutputStr;
+ FSDataOutputStream currentOutputStr = fs.create(newOutputPath);
currentPath = newOutputPath;
currentFileName = newName;
// Uncompressed for now
- seqFileWriter = SequenceFile.createWriter(conf, newOutputStr,
+ SequenceFile.Writer seqFileWriter = SequenceFile.createWriter(conf, currentOutputStr,
ChukwaArchiveKey.class, ChunkImpl.class,
SequenceFile.CompressionType.NONE, null);
- super.add(chunks);
+ ChukwaArchiveKey archiveKey = new ChukwaArchiveKey();
+
+ if (System.currentTimeMillis() >= nextTimePeriodComputation) {
+ computeTimePeriod();
+ }
+
+ for (Chunk chunk : chunks) {
+ archiveKey.setTimePartition(timePeriod);
+ archiveKey.setDataType(chunk.getDataType());
+ archiveKey.setStreamName(chunk.getTags() + "/" + chunk.getSource()
+ + "/" + chunk.getStreamName());
+ archiveKey.setSeqId(chunk.getSeqID());
+
+ if (chunk != null) {
+ // compute size for stats
+ dataSize += chunk.getData().length;
+ bytesThisRotate += chunk.getData().length;
+ seqFileWriter.append(archiveKey, chunk);
+ }
+
+ }
+
seqFileWriter.close();
+ currentOutputStr.close();
} catch(IOException e) {
throw new WriterException(e);
}
@@ -87,6 +122,13 @@
fs = FileSystem.get(new URI(fsname), conf);
isRunning = true;
+
+ statTimer = new Timer();
+ statTimer.schedule(new StatReportingTask(), 1000,
+ STAT_INTERVAL_SECONDS * 1000);
+
+
+ nextTimePeriodComputation = 0;
} catch(Exception e) {
throw new WriterException(e);
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/SeqFileWriter.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/SeqFileWriter.java?rev=894877&r1=894876&r2=894877&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/SeqFileWriter.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/SeqFileWriter.java Thu Dec 31 16:10:11 2009
@@ -48,7 +48,7 @@
static Logger log = Logger.getLogger(SeqFileWriter.class);
public static boolean ENABLE_ROTATION_ON_CLOSE = true;
- int STAT_INTERVAL_SECONDS = 30;
+ protected int STAT_INTERVAL_SECONDS = 30;
private int rotateInterval = 1000 * 60 * 5;
public static final String STAT_PERIOD_OPT = "chukwaCollector.stats.period";
@@ -69,14 +69,14 @@
protected FSDataOutputStream currentOutputStr = null;
protected SequenceFile.Writer seqFileWriter = null;
- private long timePeriod = -1;
- private long nextTimePeriodComputation = -1;
+ protected long timePeriod = -1;
+ protected long nextTimePeriodComputation = -1;
protected Timer rotateTimer = null;
protected Timer statTimer = null;
- private volatile long dataSize = 0;
- private volatile long bytesThisRotate = 0;
+ protected volatile long dataSize = 0;
+ protected volatile long bytesThisRotate = 0;
protected volatile boolean isRunning = false;
static {
@@ -143,7 +143,7 @@
}
- private class StatReportingTask extends TimerTask {
+ public class StatReportingTask extends TimerTask {
private long lastTs = System.currentTimeMillis();
public void run() {
@@ -159,6 +159,8 @@
log.info("stat:datacollection.writer.hdfs dataSize=" + currentDs
+ " dataRate=" + dataRate);
}
+
+ public StatReportingTask() {}
};
void rotate() {