You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@chukwa.apache.org by as...@apache.org on 2009/03/11 23:39:32 UTC
svn commit: r752666 [5/16] - in /hadoop/chukwa/trunk: ./
src/java/org/apache/hadoop/chukwa/ src/java/org/apache/hadoop/chukwa/conf/
src/java/org/apache/hadoop/chukwa/database/
src/java/org/apache/hadoop/chukwa/datacollection/
src/java/org/apache/hadoop...
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/ChukwaWriter.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/ChukwaWriter.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/ChukwaWriter.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/ChukwaWriter.java Wed Mar 11 22:39:26 2009
@@ -18,15 +18,16 @@
package org.apache.hadoop.chukwa.datacollection.writer;
-import java.util.List;
+import java.util.List;
import org.apache.hadoop.chukwa.Chunk;
import org.apache.hadoop.conf.Configuration;
-public interface ChukwaWriter
-{
- public void init(Configuration c) throws WriterException;
- public void add(List<Chunk> chunks) throws WriterException;
- public void close() throws WriterException;;
+public interface ChukwaWriter {
+ public void init(Configuration c) throws WriterException;
+
+ public void add(List<Chunk> chunks) throws WriterException;
+
+ public void close() throws WriterException;;
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/ClientAck.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/ClientAck.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/ClientAck.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/ClientAck.java Wed Mar 11 22:39:26 2009
@@ -18,89 +18,83 @@
package org.apache.hadoop.chukwa.datacollection.writer;
+
import org.apache.log4j.Logger;
-public class ClientAck
-{
- static Logger log = Logger.getLogger(ClientAck.class);
-
- // TODO move all constant to config
-
- public static final int OK = 100;
- public static final int KO = -100;
- public static final int KO_LOCK = -200;
-
- private long ts = 0;
-
- private Object lock = new Object();
- private int status = 0;
- private Throwable exception = null;
- private int waitTime = 6*1000;// 6 secs
- private int timeOut = 15*1000;
-
- public ClientAck()
- {
- this.ts = System.currentTimeMillis() + timeOut;
- }
-
- public int getTimeOut()
- {
+public class ClientAck {
+ static Logger log = Logger.getLogger(ClientAck.class);
+
+ // TODO move all constant to config
+
+ public static final int OK = 100;
+ public static final int KO = -100;
+ public static final int KO_LOCK = -200;
+
+ private long ts = 0;
+
+ private Object lock = new Object();
+ private int status = 0;
+ private Throwable exception = null;
+ private int waitTime = 6 * 1000;// 6 secs
+ private int timeOut = 15 * 1000;
+
+ public ClientAck() {
+ this.ts = System.currentTimeMillis() + timeOut;
+ }
+
+ public int getTimeOut() {
return timeOut;
}
- public void wait4Ack()
- {
- synchronized(lock)
- {
-// log.info(">>>>>>>>>>>>>>>>>>>>>>>>> Client synch");
- while (this.status == 0)
- {
-// log.info(">>>>>>>>>>>>>>>>>>>>>>>>> Client Before wait");
- try { lock.wait(waitTime);}
- catch(InterruptedException e)
- {}
- long now = System.currentTimeMillis();
- if (now > ts)
- {
- this.status = KO_LOCK;
- this.exception = new RuntimeException("More than maximum time lock [" + this.toString() +"]");
- }
- }
-// log.info("[" + Thread.currentThread().getName() + "] >>>>>>>>>>>>>>>>> Client after wait status [" + status + "] [" + this.toString() + "]");
- }
- }
-
- public void releaseLock(int status, Throwable exception)
- {
- this.exception = exception;
- this.status = status;
-
-// log.info("[" + Thread.currentThread().getName() + "] <<<<<<<<<<<<<<<<< Server synch [" + status + "] ----->>>> [" + this.toString() + "]");
- synchronized(lock)
- {
-// log.info("<<<<<<<<<<<<<<< Server before notify");
- lock.notifyAll();
- }
-// log.info("<<<<<<<<<<<<<<< Server after notify");
- }
-
- public int getStatus()
- {
- return status;
- }
-
- public void setStatus(int status)
- {
- this.status = status;
- }
-
- public Throwable getException()
- {
- return exception;
- }
-
- public void setException(Throwable exception)
- {
- this.exception = exception;
- }
+ public void wait4Ack() {
+ synchronized (lock) {
+ // log.info(">>>>>>>>>>>>>>>>>>>>>>>>> Client synch");
+ while (this.status == 0) {
+ // log.info(">>>>>>>>>>>>>>>>>>>>>>>>> Client Before wait");
+ try {
+ lock.wait(waitTime);
+ } catch (InterruptedException e) {
+ }
+ long now = System.currentTimeMillis();
+ if (now > ts) {
+ this.status = KO_LOCK;
+ this.exception = new RuntimeException("More than maximum time lock ["
+ + this.toString() + "]");
+ }
+ }
+ // log.info("[" + Thread.currentThread().getName() +
+ // "] >>>>>>>>>>>>>>>>> Client after wait status [" + status + "] [" +
+ // this.toString() + "]");
+ }
+ }
+
+ public void releaseLock(int status, Throwable exception) {
+ this.exception = exception;
+ this.status = status;
+
+ // log.info("[" + Thread.currentThread().getName() +
+ // "] <<<<<<<<<<<<<<<<< Server synch [" + status + "] ----->>>> [" +
+ // this.toString() + "]");
+ synchronized (lock) {
+ // log.info("<<<<<<<<<<<<<<< Server before notify");
+ lock.notifyAll();
+ }
+ // log.info("<<<<<<<<<<<<<<< Server after notify");
+ }
+
+ public int getStatus() {
+ return status;
+ }
+
+ public void setStatus(int status) {
+ this.status = status;
+ }
+
+ public Throwable getException() {
+ return exception;
+ }
+
+ public void setException(Throwable exception) {
+ this.exception = exception;
+ }
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/ConsoleWriter.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/ConsoleWriter.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/ConsoleWriter.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/ConsoleWriter.java Wed Mar 11 22:39:26 2009
@@ -18,88 +18,84 @@
package org.apache.hadoop.chukwa.datacollection.writer;
+
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
-
import org.apache.hadoop.chukwa.Chunk;
import org.apache.hadoop.conf.Configuration;
public class ConsoleWriter implements ChukwaWriter {
boolean printData;
- volatile long dataSize=0;
+ volatile long dataSize = 0;
final Timer statTimer;
-
-
+
private class StatReportingTask extends TimerTask {
- private long lastTs=System.currentTimeMillis();
- private long lastDataSize=0;
+ private long lastTs = System.currentTimeMillis();
+ private long lastDataSize = 0;
+
public void run() {
- long time = System.currentTimeMillis();
- long interval= time - lastTs;
+ long time = System.currentTimeMillis();
+ long interval = time - lastTs;
lastTs = time;
-
+
long ds = dataSize;
- long dataRate = 1000 * (ds - lastDataSize) / interval; //bytes/sec
- //refers only to data field, not including http or chukwa headers
+ long dataRate = 1000 * (ds - lastDataSize) / interval; // bytes/sec
+ // refers only to data field, not including http or chukwa headers
lastDataSize = ds;
-
- System.out.println("stat=datacollection.writer.ConsoleWriter|dataRate=" + dataRate );
+
+ System.out.println("stat=datacollection.writer.ConsoleWriter|dataRate="
+ + dataRate);
}
};
-
public ConsoleWriter() {
this(true);
}
-
+
public ConsoleWriter(boolean printData) {
this.printData = printData;
statTimer = new Timer();
}
-
- public void close()
- {
+
+ public void close() {
statTimer.cancel();
}
- public void init(Configuration conf) throws WriterException
- {
- System.out.println("---- DUMMY HDFS WRITER IN USE ---");
+ public void init(Configuration conf) throws WriterException {
+ System.out.println("---- DUMMY HDFS WRITER IN USE ---");
- statTimer.schedule(new StatReportingTask(), 1000,10*1000);
+ statTimer.schedule(new StatReportingTask(), 1000, 10 * 1000);
}
- public void add(Chunk data) throws WriterException
- {
+ public void add(Chunk data) throws WriterException {
int startOffset = 0;
dataSize += data.getData().length;
- if(printData) {
+ if (printData) {
System.out.println(data.getData().length + " bytes of data in chunk");
- for(int offset: data.getRecordOffsets()) {
+ for (int offset : data.getRecordOffsets()) {
System.out.print(data.getStreamName());
System.out.print(" ");
System.out.print(data.getSource());
System.out.print(" ");
System.out.print(data.getDataType());
System.out.print(") ");
- System.out.print(new String(data.getData(), startOffset, offset - startOffset + 1));
- startOffset= offset + 1;
+ System.out.print(new String(data.getData(), startOffset, offset
+ - startOffset + 1));
+ startOffset = offset + 1;
}
}
}
-@Override
-public void add(List<Chunk> chunks) throws WriterException
-{
- for(Chunk chunk: chunks)
- {
- add(chunk);
- }
-
-}
+ @Override
+ public void add(List<Chunk> chunks) throws WriterException {
+ for (Chunk chunk : chunks) {
+ add(chunk);
+ }
+
+ }
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/Dedup.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/Dedup.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/Dedup.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/Dedup.java Wed Mar 11 22:39:26 2009
@@ -17,21 +17,21 @@
*/
package org.apache.hadoop.chukwa.datacollection.writer;
+
import java.util.ArrayDeque;
import java.util.HashSet;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
-
import org.apache.hadoop.chukwa.Chunk;
import org.apache.hadoop.conf.Configuration;
public class Dedup implements PipelineableWriter {
-
+
static final class DedupKey {
String name;
- long val; //sequence number
-
+ long val; // sequence number
+
public DedupKey(String n, long p) {
name = n;
val = p;
@@ -42,43 +42,45 @@
}
public boolean equals(Object dk) {
- if(dk instanceof DedupKey)
- return name.equals(((DedupKey)dk).name) && val == ((DedupKey)dk).val;
- else return false;
+ if (dk instanceof DedupKey)
+ return name.equals(((DedupKey) dk).name) && val == ((DedupKey) dk).val;
+ else
+ return false;
}
}
-
+
static class FixedSizeCache<EntryType> {
final HashSet<EntryType> hs;
final Queue<EntryType> toDrop;
final int maxSize;
- volatile long dupchunks =0;
+ volatile long dupchunks = 0;
+
public FixedSizeCache(int size) {
maxSize = size;
hs = new HashSet<EntryType>(maxSize);
toDrop = new ArrayDeque<EntryType>(maxSize);
}
-
+
public synchronized void add(EntryType t) {
- if(maxSize == 0)
+ if (maxSize == 0)
return;
-
- if(hs.size() >= maxSize)
- while(hs.size() >= maxSize) {
+
+ if (hs.size() >= maxSize)
+ while (hs.size() >= maxSize) {
EntryType td = toDrop.remove();
hs.remove(td);
}
-
+
hs.add(t);
toDrop.add(t);
}
-
+
private synchronized boolean addAndCheck(EntryType t) {
- if(maxSize == 0)
+ if (maxSize == 0)
return false;
-
- boolean b= hs.contains(t);
- if(b)
+
+ boolean b = hs.contains(t);
+ if (b)
dupchunks++;
else {
hs.add(t);
@@ -86,12 +88,11 @@
}
return b;
}
-
+
private long dupCount() {
return dupchunks;
}
}
-
FixedSizeCache<DedupKey> cache;
ChukwaWriter next;
@@ -104,11 +105,11 @@
@Override
public void add(List<Chunk> chunks) throws WriterException {
ArrayList<Chunk> passedThrough = new ArrayList<Chunk>();
- for(Chunk c: chunks)
- if(! cache.addAndCheck(new DedupKey(c.getStreamName(), c.getSeqID())))
+ for (Chunk c : chunks)
+ if (!cache.addAndCheck(new DedupKey(c.getStreamName(), c.getSeqID())))
passedThrough.add(c);
-
- if(!passedThrough.isEmpty())
+
+ if (!passedThrough.isEmpty())
next.add(passedThrough);
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/InMemoryWriter.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/InMemoryWriter.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/InMemoryWriter.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/InMemoryWriter.java Wed Mar 11 22:39:26 2009
@@ -17,9 +17,9 @@
*/
package org.apache.hadoop.chukwa.datacollection.writer;
+
import java.io.*;
import java.util.List;
-
import org.apache.hadoop.chukwa.Chunk;
import org.apache.hadoop.chukwa.ChunkImpl;
import org.apache.hadoop.conf.Configuration;
@@ -64,22 +64,22 @@
*
* @param bytes amount to try to read
* @param ms time to wait
- * @return a newly read-in chunk
+ * @return a newly read-in chunk
* @throws IOException
*/
public Chunk readOutChunk(int bytes, int ms) throws IOException {
long readStartTime = System.currentTimeMillis();
try {
- while(buf.size() < bytes ) {
- synchronized(this) {
+ while (buf.size() < bytes) {
+ synchronized (this) {
long timeLeft = ms - System.currentTimeMillis() + readStartTime;
- if(timeLeft > 0)
- wait(timeLeft);
+ if (timeLeft > 0)
+ wait(timeLeft);
}
}
- if(dis == null)
- dis = new DataInputStream( new ByteArrayInputStream(buf.toByteArray()));
+ if (dis == null)
+ dis = new DataInputStream(new ByteArrayInputStream(buf.toByteArray()));
return ChunkImpl.read(dis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/PipelineStageWriter.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/PipelineStageWriter.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/PipelineStageWriter.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/PipelineStageWriter.java Wed Mar 11 22:39:26 2009
@@ -18,9 +18,9 @@
package org.apache.hadoop.chukwa.datacollection.writer;
+
import java.util.ArrayList;
import java.util.List;
-
import org.apache.hadoop.chukwa.Chunk;
import org.apache.hadoop.conf.Configuration;
import org.apache.log4j.Logger;
@@ -28,7 +28,7 @@
public class PipelineStageWriter implements ChukwaWriter {
Logger log = Logger.getLogger(PipelineStageWriter.class);
- ChukwaWriter writer; //head of pipeline
+ ChukwaWriter writer; // head of pipeline
@Override
public void add(List<Chunk> chunks) throws WriterException {
@@ -47,45 +47,50 @@
try {
String[] classes = pipeline.split(",");
ArrayList<PipelineableWriter> stages = new ArrayList<PipelineableWriter>();
-
- PipelineableWriter lastWriter= null;
- if(classes.length > 1) {
- lastWriter = (PipelineableWriter) conf.getClassByName(classes[0]).newInstance();
+
+ PipelineableWriter lastWriter = null;
+ if (classes.length > 1) {
+ lastWriter = (PipelineableWriter) conf.getClassByName(classes[0])
+ .newInstance();
lastWriter.init(conf);
writer = lastWriter;
}
-
- for(int i = 1; i < classes.length -1; ++i) {
+
+ for (int i = 1; i < classes.length - 1; ++i) {
Class stageClass = conf.getClassByName(classes[i]);
Object st = stageClass.newInstance();
- if(!(st instanceof PipelineableWriter))
- log.error("class "+ classes[i]+ " in processing pipeline isn't a pipeline stage");
-
- PipelineableWriter stage = (PipelineableWriter) stageClass.newInstance();
+ if (!(st instanceof PipelineableWriter))
+ log.error("class " + classes[i]
+ + " in processing pipeline isn't a pipeline stage");
+
+ PipelineableWriter stage = (PipelineableWriter) stageClass
+ .newInstance();
stage.init(conf);
- //throws exception if types don't match or class not found; this is OK.
-
+ // throws exception if types don't match or class not found; this is
+ // OK.
+
lastWriter.setNextStage(stage);
lastWriter = stage;
}
- Class stageClass = conf.getClassByName(classes[classes.length-1]);
+ Class stageClass = conf.getClassByName(classes[classes.length - 1]);
Object st = stageClass.newInstance();
-
- if(!(st instanceof ChukwaWriter)) {
- log.error("class "+ classes[classes.length-1]+ " at end of processing pipeline isn't a ChukwaWriter");
+
+ if (!(st instanceof ChukwaWriter)) {
+ log.error("class " + classes[classes.length - 1]
+ + " at end of processing pipeline isn't a ChukwaWriter");
throw new WriterException("bad pipeline");
} else {
- if(lastWriter != null)
+ if (lastWriter != null)
lastWriter.setNextStage((ChukwaWriter) st);
else
- writer = (ChukwaWriter) st; //one stage pipeline
+ writer = (ChukwaWriter) st; // one stage pipeline
}
- return;
- } catch(Exception e) {
- //if anything went wrong (missing class, etc) we wind up here.
- log.error("failed to set up pipeline, defaulting to SeqFileWriter",e);
- //fall through to default case
- throw new WriterException("bad pipeline");
+ return;
+ } catch (Exception e) {
+ // if anything went wrong (missing class, etc) we wind up here.
+ log.error("failed to set up pipeline, defaulting to SeqFileWriter", e);
+ // fall through to default case
+ throw new WriterException("bad pipeline");
}
} else {
throw new WriterException("must set chukwaCollector.pipeline");
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/PipelineableWriter.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/PipelineableWriter.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/PipelineableWriter.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/PipelineableWriter.java Wed Mar 11 22:39:26 2009
@@ -18,6 +18,7 @@
package org.apache.hadoop.chukwa.datacollection.writer;
-public interface PipelineableWriter extends ChukwaWriter{
- public void setNextStage(ChukwaWriter next);
+
+public interface PipelineableWriter extends ChukwaWriter {
+ public void setNextStage(ChukwaWriter next);
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/SeqFileWriter.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/SeqFileWriter.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/SeqFileWriter.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/SeqFileWriter.java Wed Mar 11 22:39:26 2009
@@ -18,6 +18,7 @@
package org.apache.hadoop.chukwa.datacollection.writer;
+
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
@@ -25,7 +26,6 @@
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
-
import org.apache.hadoop.chukwa.ChukwaArchiveKey;
import org.apache.hadoop.chukwa.Chunk;
import org.apache.hadoop.chukwa.ChunkImpl;
@@ -41,384 +41,352 @@
* this object.
*
*/
-public class SeqFileWriter implements ChukwaWriter
-{
- public static final boolean ENABLE_ROTATION = true;
-
- static final int STAT_INTERVAL_SECONDS = 30;
- static final Object lock = new Object();
-
- static Logger log = Logger.getLogger(SeqFileWriter.class);
-
- private FileSystem fs = null;
- private Configuration conf = null;
-
- private String outputDir = null;
- private Calendar calendar = Calendar.getInstance();
-
- private Path currentPath = null;
- private String currentFileName = null;
- private FSDataOutputStream currentOutputStr = null;
- private static SequenceFile.Writer seqFileWriter = null;
-
- private static ClientAck clientAck = new ClientAck();
- private static long nextRotate = 0;
- private static int rotateInterval = 1000*60;
-
- private static Timer clientAckTimer = null;
-
- private Timer timer = null;
-
- private Timer statTimer = null;
- private volatile long dataSize = 0;
-
-
- private int initWriteChunkRetries = 10;
- private int writeChunkRetries = initWriteChunkRetries;
- private boolean chunksWrittenThisRotate = false;
-
- public SeqFileWriter() throws WriterException
- {
- }
-
- public void init(Configuration conf) throws WriterException
- {
- outputDir = conf.get("chukwaCollector.outputDir", "/chukwa");
-
- rotateInterval = conf.getInt("chukwaCollector.rotateInterval",
- 1000 * 60 * 5);//defaults to 5 minutes
- nextRotate = System.currentTimeMillis() + rotateInterval;
-
- initWriteChunkRetries = conf.getInt("chukwaCollector.writeChunkRetries", 10);
- writeChunkRetries = initWriteChunkRetries;
-
- //check if they've told us the file system to use
- String fsname = conf.get("writer.hdfs.filesystem");
- if (fsname == null || fsname.equals(""))
- {
- //otherwise try to get the filesystem from hadoop
- fsname = conf.get("fs.default.name");
- }
-
-
- log.info("rotateInterval is " + rotateInterval);
- log.info("outputDir is " + outputDir);
- log.info("fsname is " + fsname);
- log.info("filesystem type from core-default.xml is "
- + conf.get("fs.hdfs.impl"));
-
- if (fsname == null) {
- log.error("no filesystem name");
- throw new WriterException("no filesystem");
- } try {
- fs = FileSystem.get(new URI(fsname), conf);
- if (fs == null) {
- log.error("can't connect to HDFS at " + fs.getUri());
- return;
- } else
- log.info("filesystem is " + fs.getUri());
- } catch (IOException e) {
- log.error(
- "can't connect to HDFS, trying default file system instead (likely to be local)",
- e);
- try {
- fs = FileSystem.get(conf);
- } catch (IOException err) {
- log.error("can't connect to default file system either", e);
- }
- } catch (URISyntaxException e) {
- log.error("problem generating new URI from config setting");
- return;
- }
-
- // Setup everything by rotating
- rotate();
-
- clientAckTimer = new Timer();
- clientAckTimer.schedule(new TimerTask()
- {
- public void run()
- {
- synchronized (lock)
- {
- ClientAck previous = clientAck ;
- SeqFileWriter.clientAck = new ClientAck();
-
- try
- {
- // SeqFile is uncompressed for now
- // So we can flush every xx secs
- // But if we're using block Compression
- // this is not true anymore
- // because this will trigger
- // the compression
- if (currentOutputStr != null)
- {
- currentOutputStr.flush();
- }
- previous.releaseLock(ClientAck.OK, null);
- long now = System.currentTimeMillis();
- if (now >= nextRotate)
- {
- nextRotate = System.currentTimeMillis() + rotateInterval;
- rotate();
- }
- }
- catch(Throwable e)
- {
- previous.releaseLock(ClientAck.KO, e);
- log.warn("Exception when flushing ", e);
- e.printStackTrace();
- }
- }
- }
-
- }, (5*1000), (5*1000));
-
- statTimer = new Timer();
- statTimer.schedule(new StatReportingTask(), 1000, STAT_INTERVAL_SECONDS * 1000);
-
-
-
- }
-
- private class StatReportingTask extends TimerTask
- {
- private long lastTs = System.currentTimeMillis();
-
- public void run()
- {
-
- long time = System.currentTimeMillis();
- long currentDs = dataSize;
- dataSize = 0;
-
- long interval = time - lastTs;
- lastTs = time;
-
- long dataRate = 1000 * currentDs / interval; // kb/sec
- log.info("stat:datacollection.writer.hdfs dataSize=" + currentDs + " dataRate=" + dataRate);
- }
- };
-
-
- void rotate()
- {
- calendar.setTimeInMillis(System.currentTimeMillis());
-
- log.info("start Date [" + calendar.getTime() + "]");
- log.info("Rotate from " + Thread.currentThread().getName());
-
- String newName = new java.text.SimpleDateFormat("yyyyddHHmmssSSS").format(calendar.getTime());
- newName += "_" + new java.rmi.server.UID().toString();
- newName = newName.replace("-", "");
- newName = newName.replace(":", "");
- newName = newName.replace(".", "");
- newName = outputDir + "/" + newName.trim();
-
-
- synchronized (lock)
- {
- try
- {
- FSDataOutputStream previousOutputStr = currentOutputStr;
- Path previousPath = currentPath;
- String previousFileName = currentFileName;
-
- if (previousOutputStr != null)
- {
- previousOutputStr.close();
- if(chunksWrittenThisRotate) {
- fs.rename(previousPath, new Path(previousFileName + ".done"));
- } else {
- log.info("no chunks written to "+ previousPath + ", deleting");
- fs.delete(previousPath, false);
- }
- }
- Path newOutputPath = new Path(newName + ".chukwa");
- FSDataOutputStream newOutputStr = fs.create(newOutputPath);
- currentOutputStr = newOutputStr;
- currentPath = newOutputPath;
- currentFileName = newName;
- chunksWrittenThisRotate = false;
- // Uncompressed for now
- seqFileWriter = SequenceFile.createWriter(conf, newOutputStr,
- ChukwaArchiveKey.class, ChunkImpl.class,
- SequenceFile.CompressionType.NONE, null);
-
- }
- catch (IOException e)
- {
- log.fatal("IO Exception in rotate. Exiting!");
- e.printStackTrace();
- // TODO
- // As discussed for now:
- // Everytime this happen in the past it was because HDFS was down,
- // so there's nothing we can do
- // Shutting down the collector for now
- // Watchdog will re-start it automatically
- System.exit(-1);
- }
- }
-
- log.debug("finished rotate()");
- }
-
- // TODO merge the 2 add functions
- @Override
- public void add(List<Chunk> chunks) throws WriterException
- {
- if (chunks != null) {
- try {
- chunksWrittenThisRotate = true;
- ChukwaArchiveKey archiveKey = new ChukwaArchiveKey();
-
- // FIXME compute this once an hour
- //
- synchronized (calendar)
- {
- calendar.setTimeInMillis(System.currentTimeMillis());
- calendar.set(Calendar.MINUTE, 0);
- calendar.set(Calendar.SECOND, 0);
- calendar.set(Calendar.MILLISECOND, 0);
-
- archiveKey.setTimePartition(calendar.getTimeInMillis());
- }
-
- ClientAck localClientAck = null;
- synchronized(lock)
- {
- localClientAck = SeqFileWriter.clientAck;
- for (Chunk chunk : chunks)
- {
- archiveKey.setDataType(chunk.getDataType());
- archiveKey.setStreamName(chunk.getTags() + "/" + chunk.getSource() + "/" + chunk.getStreamName());
- archiveKey.setSeqId(chunk.getSeqID());
-
- if (chunk != null)
- {
- seqFileWriter.append(archiveKey, chunk);
- // compute size for stats
- dataSize += chunk.getData().length;
- }
-
- }
- }// End synchro
-
- localClientAck.wait4Ack();
- if (localClientAck.getStatus() != ClientAck.OK)
- {
- log.warn("Exception after notyfyAll on the lock - Thread:" + Thread.currentThread().getName(),localClientAck.getException());
- throw new WriterException(localClientAck.getException());
- }
- else
- {
- // sucess
- writeChunkRetries = initWriteChunkRetries;
- }
-
- }
- catch (IOException e)
- {
- writeChunkRetries --;
- log.error("Could not save the chunk. ", e);
-
- if (writeChunkRetries < 0)
- {
- log.fatal("Too many IOException when trying to write a chunk, Collector is going to exit!");
- System.exit(-1);
- }
- throw new WriterException(e);
- }
- }
-
- }
-
- public void add(Chunk chunk) throws WriterException
- {
-
- if (chunk != null) {
- try {
- ChukwaArchiveKey archiveKey = new ChukwaArchiveKey();
-
- // FIXME compute this once an hour
- synchronized (calendar)
- {
- calendar.setTimeInMillis(System.currentTimeMillis());
- calendar.set(Calendar.MINUTE, 0);
- calendar.set(Calendar.SECOND, 0);
- calendar.set(Calendar.MILLISECOND, 0);
-
- archiveKey.setTimePartition(calendar.getTimeInMillis());
- }
-
- archiveKey.setDataType(chunk.getDataType());
- archiveKey.setStreamName(chunk.getTags() + "/" + chunk.getSource() + "/" + chunk.getStreamName());
- archiveKey.setSeqId(chunk.getSeqID());
-
- ClientAck localClientAck = null;
- synchronized(lock)
- {
- localClientAck = SeqFileWriter.clientAck;
- log.info("[" + Thread.currentThread().getName() + "] Client >>>>>>>>>>>> Current Ack object ===>>>>" + localClientAck.toString());
- seqFileWriter.append(archiveKey, chunk);
-
- // compute size for stats
- dataSize += chunk.getData().length;
- }
- localClientAck.wait4Ack();
-
- if (localClientAck.getStatus() != ClientAck.OK)
- {
- log.warn("Exception after notyfyAll on the lock - Thread:" + Thread.currentThread().getName(),localClientAck.getException());
- throw new WriterException(localClientAck.getException());
- }
- else
- {
- // sucess
- writeChunkRetries = initWriteChunkRetries;
- }
- }
- catch (IOException e)
- {
- writeChunkRetries --;
- log.error("Could not save the chunk. ", e);
-
- if (writeChunkRetries < 0)
- {
- log.fatal("Too many IOException when trying to write a chunk, Collector is going to exit!");
- System.exit(-1);
- }
- throw new WriterException(e);
- }
- }
- }
-
- public void close()
- {
- synchronized (lock)
- {
- if (timer != null)
- timer.cancel();
- if (statTimer != null)
- statTimer.cancel();
- if (clientAckTimer != null)
- clientAckTimer.cancel();
- try {
-
- if (this.currentOutputStr != null)
- {
- this.currentOutputStr.close();
- }
-
- clientAck.releaseLock(ClientAck.OK, null);
- fs.rename(currentPath, new Path(currentFileName + ".done"));
- } catch (IOException e)
- {
- clientAck.releaseLock(ClientAck.OK, e);
- log.error("failed to close and rename stream", e);
- }
- }
- }
+public class SeqFileWriter implements ChukwaWriter {
+ public static final boolean ENABLE_ROTATION = true;
+
+ static final int STAT_INTERVAL_SECONDS = 30;
+ static final Object lock = new Object();
+
+ static Logger log = Logger.getLogger(SeqFileWriter.class);
+
+ private FileSystem fs = null;
+ private Configuration conf = null;
+
+ private String outputDir = null;
+ private Calendar calendar = Calendar.getInstance();
+
+ private Path currentPath = null;
+ private String currentFileName = null;
+ private FSDataOutputStream currentOutputStr = null;
+ private static SequenceFile.Writer seqFileWriter = null;
+
+ private static ClientAck clientAck = new ClientAck();
+ private static long nextRotate = 0;
+ private static int rotateInterval = 1000 * 60;
+
+ private static Timer clientAckTimer = null;
+
+ private Timer timer = null;
+
+ private Timer statTimer = null;
+ private volatile long dataSize = 0;
+
+ private int initWriteChunkRetries = 10;
+ private int writeChunkRetries = initWriteChunkRetries;
+ private boolean chunksWrittenThisRotate = false;
+
+ public SeqFileWriter() throws WriterException {
+ }
+
+ public void init(Configuration conf) throws WriterException {
+ outputDir = conf.get("chukwaCollector.outputDir", "/chukwa");
+
+ rotateInterval = conf.getInt("chukwaCollector.rotateInterval",
+ 1000 * 60 * 5);// defaults to 5 minutes
+ nextRotate = System.currentTimeMillis() + rotateInterval;
+
+ initWriteChunkRetries = conf
+ .getInt("chukwaCollector.writeChunkRetries", 10);
+ writeChunkRetries = initWriteChunkRetries;
+
+ // check if they've told us the file system to use
+ String fsname = conf.get("writer.hdfs.filesystem");
+ if (fsname == null || fsname.equals("")) {
+ // otherwise try to get the filesystem from hadoop
+ fsname = conf.get("fs.default.name");
+ }
+
+ log.info("rotateInterval is " + rotateInterval);
+ log.info("outputDir is " + outputDir);
+ log.info("fsname is " + fsname);
+ log.info("filesystem type from core-default.xml is "
+ + conf.get("fs.hdfs.impl"));
+
+ if (fsname == null) {
+ log.error("no filesystem name");
+ throw new WriterException("no filesystem");
+ }
+ try {
+ fs = FileSystem.get(new URI(fsname), conf);
+ if (fs == null) {
+ log.error("can't connect to HDFS at " + fs.getUri());
+ return;
+ } else
+ log.info("filesystem is " + fs.getUri());
+ } catch (IOException e) {
+ log
+ .error(
+ "can't connect to HDFS, trying default file system instead (likely to be local)",
+ e);
+ try {
+ fs = FileSystem.get(conf);
+ } catch (IOException err) {
+ log.error("can't connect to default file system either", e);
+ }
+ } catch (URISyntaxException e) {
+ log.error("problem generating new URI from config setting");
+ return;
+ }
+
+ // Setup everything by rotating
+ rotate();
+
+ clientAckTimer = new Timer();
+ clientAckTimer.schedule(new TimerTask() {
+ public void run() {
+ synchronized (lock) {
+ ClientAck previous = clientAck;
+ SeqFileWriter.clientAck = new ClientAck();
+
+ try {
+ // SeqFile is uncompressed for now
+ // So we can flush every xx secs
+ // But if we're using block Compression
+ // this is not true anymore
+ // because this will trigger
+ // the compression
+ if (currentOutputStr != null) {
+ currentOutputStr.flush();
+ }
+ previous.releaseLock(ClientAck.OK, null);
+ long now = System.currentTimeMillis();
+ if (now >= nextRotate) {
+ nextRotate = System.currentTimeMillis() + rotateInterval;
+ rotate();
+ }
+ } catch (Throwable e) {
+ previous.releaseLock(ClientAck.KO, e);
+ log.warn("Exception when flushing ", e);
+ e.printStackTrace();
+ }
+ }
+ }
+
+ }, (5 * 1000), (5 * 1000));
+
+ statTimer = new Timer();
+ statTimer.schedule(new StatReportingTask(), 1000,
+ STAT_INTERVAL_SECONDS * 1000);
+
+ }
+
+ private class StatReportingTask extends TimerTask {
+ private long lastTs = System.currentTimeMillis();
+
+ public void run() {
+
+ long time = System.currentTimeMillis();
+ long currentDs = dataSize;
+ dataSize = 0;
+
+ long interval = time - lastTs;
+ lastTs = time;
+
+ long dataRate = 1000 * currentDs / interval; // kb/sec
+ log.info("stat:datacollection.writer.hdfs dataSize=" + currentDs
+ + " dataRate=" + dataRate);
+ }
+ };
+
+ void rotate() {
+ calendar.setTimeInMillis(System.currentTimeMillis());
+
+ log.info("start Date [" + calendar.getTime() + "]");
+ log.info("Rotate from " + Thread.currentThread().getName());
+
+ String newName = new java.text.SimpleDateFormat("yyyyddHHmmssSSS")
+ .format(calendar.getTime());
+ newName += "_" + new java.rmi.server.UID().toString();
+ newName = newName.replace("-", "");
+ newName = newName.replace(":", "");
+ newName = newName.replace(".", "");
+ newName = outputDir + "/" + newName.trim();
+
+ synchronized (lock) {
+ try {
+ FSDataOutputStream previousOutputStr = currentOutputStr;
+ Path previousPath = currentPath;
+ String previousFileName = currentFileName;
+
+ if (previousOutputStr != null) {
+ previousOutputStr.close();
+ if (chunksWrittenThisRotate) {
+ fs.rename(previousPath, new Path(previousFileName + ".done"));
+ } else {
+ log.info("no chunks written to " + previousPath + ", deleting");
+ fs.delete(previousPath, false);
+ }
+ }
+ Path newOutputPath = new Path(newName + ".chukwa");
+ FSDataOutputStream newOutputStr = fs.create(newOutputPath);
+ currentOutputStr = newOutputStr;
+ currentPath = newOutputPath;
+ currentFileName = newName;
+ chunksWrittenThisRotate = false;
+ // Uncompressed for now
+ seqFileWriter = SequenceFile.createWriter(conf, newOutputStr,
+ ChukwaArchiveKey.class, ChunkImpl.class,
+ SequenceFile.CompressionType.NONE, null);
+
+ } catch (IOException e) {
+ log.fatal("IO Exception in rotate. Exiting!");
+ e.printStackTrace();
+ // TODO
+ // As discussed for now:
+ // Everytime this happen in the past it was because HDFS was down,
+ // so there's nothing we can do
+ // Shutting down the collector for now
+ // Watchdog will re-start it automatically
+ System.exit(-1);
+ }
+ }
+
+ log.debug("finished rotate()");
+ }
+
+ // TODO merge the 2 add functions
+ @Override
+ public void add(List<Chunk> chunks) throws WriterException {
+ if (chunks != null) {
+ try {
+ chunksWrittenThisRotate = true;
+ ChukwaArchiveKey archiveKey = new ChukwaArchiveKey();
+
+ // FIXME compute this once an hour
+ //
+ synchronized (calendar) {
+ calendar.setTimeInMillis(System.currentTimeMillis());
+ calendar.set(Calendar.MINUTE, 0);
+ calendar.set(Calendar.SECOND, 0);
+ calendar.set(Calendar.MILLISECOND, 0);
+
+ archiveKey.setTimePartition(calendar.getTimeInMillis());
+ }
+
+ ClientAck localClientAck = null;
+ synchronized (lock) {
+ localClientAck = SeqFileWriter.clientAck;
+ for (Chunk chunk : chunks) {
+ archiveKey.setDataType(chunk.getDataType());
+ archiveKey.setStreamName(chunk.getTags() + "/" + chunk.getSource()
+ + "/" + chunk.getStreamName());
+ archiveKey.setSeqId(chunk.getSeqID());
+
+ if (chunk != null) {
+ seqFileWriter.append(archiveKey, chunk);
+ // compute size for stats
+ dataSize += chunk.getData().length;
+ }
+
+ }
+ }// End synchro
+
+ localClientAck.wait4Ack();
+ if (localClientAck.getStatus() != ClientAck.OK) {
+ log
+ .warn("Exception after notyfyAll on the lock - Thread:"
+ + Thread.currentThread().getName(), localClientAck
+ .getException());
+ throw new WriterException(localClientAck.getException());
+ } else {
+ // sucess
+ writeChunkRetries = initWriteChunkRetries;
+ }
+
+ } catch (IOException e) {
+ writeChunkRetries--;
+ log.error("Could not save the chunk. ", e);
+
+ if (writeChunkRetries < 0) {
+ log
+ .fatal("Too many IOException when trying to write a chunk, Collector is going to exit!");
+ System.exit(-1);
+ }
+ throw new WriterException(e);
+ }
+ }
+
+ }
+
+ public void add(Chunk chunk) throws WriterException {
+
+ if (chunk != null) {
+ try {
+ ChukwaArchiveKey archiveKey = new ChukwaArchiveKey();
+
+ // FIXME compute this once an hour
+ synchronized (calendar) {
+ calendar.setTimeInMillis(System.currentTimeMillis());
+ calendar.set(Calendar.MINUTE, 0);
+ calendar.set(Calendar.SECOND, 0);
+ calendar.set(Calendar.MILLISECOND, 0);
+
+ archiveKey.setTimePartition(calendar.getTimeInMillis());
+ }
+
+ archiveKey.setDataType(chunk.getDataType());
+ archiveKey.setStreamName(chunk.getTags() + "/" + chunk.getSource()
+ + "/" + chunk.getStreamName());
+ archiveKey.setSeqId(chunk.getSeqID());
+
+ ClientAck localClientAck = null;
+ synchronized (lock) {
+ localClientAck = SeqFileWriter.clientAck;
+ log.info("[" + Thread.currentThread().getName()
+ + "] Client >>>>>>>>>>>> Current Ack object ===>>>>"
+ + localClientAck.toString());
+ seqFileWriter.append(archiveKey, chunk);
+
+ // compute size for stats
+ dataSize += chunk.getData().length;
+ }
+ localClientAck.wait4Ack();
+
+ if (localClientAck.getStatus() != ClientAck.OK) {
+ log
+ .warn("Exception after notyfyAll on the lock - Thread:"
+ + Thread.currentThread().getName(), localClientAck
+ .getException());
+ throw new WriterException(localClientAck.getException());
+ } else {
+ // sucess
+ writeChunkRetries = initWriteChunkRetries;
+ }
+ } catch (IOException e) {
+ writeChunkRetries--;
+ log.error("Could not save the chunk. ", e);
+
+ if (writeChunkRetries < 0) {
+ log
+ .fatal("Too many IOException when trying to write a chunk, Collector is going to exit!");
+ System.exit(-1);
+ }
+ throw new WriterException(e);
+ }
+ }
+ }
+
+ public void close() {
+ synchronized (lock) {
+ if (timer != null)
+ timer.cancel();
+ if (statTimer != null)
+ statTimer.cancel();
+ if (clientAckTimer != null)
+ clientAckTimer.cancel();
+ try {
+
+ if (this.currentOutputStr != null) {
+ this.currentOutputStr.close();
+ }
+
+ clientAck.releaseLock(ClientAck.OK, null);
+ fs.rename(currentPath, new Path(currentFileName + ".done"));
+ } catch (IOException e) {
+ clientAck.releaseLock(ClientAck.OK, e);
+ log.error("failed to close and rename stream", e);
+ }
+ }
+ }
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/WriterException.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/WriterException.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/WriterException.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/WriterException.java Wed Mar 11 22:39:26 2009
@@ -1,29 +1,26 @@
package org.apache.hadoop.chukwa.datacollection.writer;
-public class WriterException extends Exception
-{
- /**
+public class WriterException extends Exception {
+
+ /**
*
*/
- private static final long serialVersionUID = -4207275200546397145L;
+ private static final long serialVersionUID = -4207275200546397145L;
- public WriterException()
- {}
+ public WriterException() {
+ }
- public WriterException(String message)
- {
- super(message);
- }
-
- public WriterException(Throwable cause)
- {
- super(cause);
- }
-
- public WriterException(String message, Throwable cause)
- {
- super(message, cause);
- }
+ public WriterException(String message) {
+ super(message);
+ }
+
+ public WriterException(Throwable cause) {
+ super(cause);
+ }
+
+ public WriterException(String message, Throwable cause) {
+ super(message, cause);
+ }
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChuckwaArchiveBuilder.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChuckwaArchiveBuilder.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChuckwaArchiveBuilder.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChuckwaArchiveBuilder.java Wed Mar 11 22:39:26 2009
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.chukwa.extraction.archive;
+
import org.apache.hadoop.chukwa.ChukwaArchiveKey;
import org.apache.hadoop.chukwa.ChunkImpl;
import org.apache.hadoop.conf.Configuration;
@@ -33,78 +34,64 @@
import org.apache.hadoop.util.ToolRunner;
import org.apache.log4j.Logger;
-public class ChuckwaArchiveBuilder extends Configured implements Tool
-{
- static Logger log = Logger.getLogger(ChuckwaArchiveBuilder.class);
-
- static int printUsage()
- {
- System.out
- .println("ChuckwaArchiveBuilder <Daily/Hourly> <input> <output>");
- ToolRunner.printGenericCommandUsage(System.out);
- return -1;
- }
-
- public int run(String[] args) throws Exception
- {
-
-
- // Make sure there are exactly 3 parameters left.
- if (args.length != 3)
- {
- System.out.println("ERROR: Wrong number of parameters: "
- + args.length + " instead of 3.");
- return printUsage();
- }
-
- JobConf conf = new JobConf(getConf(), ChuckwaArchiveBuilder.class);
-
-
- conf.setInputFormat(SequenceFileInputFormat.class);
-
- conf.setMapperClass(IdentityMapper.class);
- conf.setReducerClass(IdentityReducer.class);
-
- if (args[0].equalsIgnoreCase("Daily"))
- {
- conf.setPartitionerClass(ChukwaArchiveDailyPartitioner.class);
- conf.setOutputFormat(ChukwaArchiveDailyOutputFormat.class);
- conf.setJobName("Chukwa-DailyArchiveBuilder");
- }
- else if (args[0].equalsIgnoreCase("Hourly"))
- {
- conf.setJobName("Chukwa-HourlyArchiveBuilder");
- conf.setPartitionerClass(ChukwaArchiveHourlyPartitioner.class);
- conf.setOutputFormat(ChukwaArchiveHourlyOutputFormat.class);
- }
- else
- {
- System.out.println("ERROR: Wrong Time partionning: "
- + args[0] + " instead of [Hourly/Daily].");
- return printUsage();
- }
-
-
- conf.setOutputKeyClass(ChukwaArchiveKey.class);
- conf.setOutputValueClass(ChunkImpl.class);
-
- //FIXME need compression - read config
- //conf.setCompressMapOutput(true);
- //conf.setMapOutputCompressorClass(LzoCodec.class);
-
- //
-
- FileInputFormat.setInputPaths(conf, args[1]);
- FileOutputFormat.setOutputPath(conf, new Path(args[2]));
-
- JobClient.runJob(conf);
- return 0;
- }
-
- public static void main(String[] args) throws Exception
- {
- int res = ToolRunner.run(new Configuration(),
- new ChuckwaArchiveBuilder(), args);
- System.exit(res);
- }
+public class ChuckwaArchiveBuilder extends Configured implements Tool {
+ static Logger log = Logger.getLogger(ChuckwaArchiveBuilder.class);
+
+ static int printUsage() {
+ System.out.println("ChuckwaArchiveBuilder <Daily/Hourly> <input> <output>");
+ ToolRunner.printGenericCommandUsage(System.out);
+ return -1;
+ }
+
+ public int run(String[] args) throws Exception {
+
+ // Make sure there are exactly 3 parameters left.
+ if (args.length != 3) {
+ System.out.println("ERROR: Wrong number of parameters: " + args.length
+ + " instead of 3.");
+ return printUsage();
+ }
+
+ JobConf conf = new JobConf(getConf(), ChuckwaArchiveBuilder.class);
+
+ conf.setInputFormat(SequenceFileInputFormat.class);
+
+ conf.setMapperClass(IdentityMapper.class);
+ conf.setReducerClass(IdentityReducer.class);
+
+ if (args[0].equalsIgnoreCase("Daily")) {
+ conf.setPartitionerClass(ChukwaArchiveDailyPartitioner.class);
+ conf.setOutputFormat(ChukwaArchiveDailyOutputFormat.class);
+ conf.setJobName("Chukwa-DailyArchiveBuilder");
+ } else if (args[0].equalsIgnoreCase("Hourly")) {
+ conf.setJobName("Chukwa-HourlyArchiveBuilder");
+ conf.setPartitionerClass(ChukwaArchiveHourlyPartitioner.class);
+ conf.setOutputFormat(ChukwaArchiveHourlyOutputFormat.class);
+ } else {
+ System.out.println("ERROR: Wrong Time partionning: " + args[0]
+ + " instead of [Hourly/Daily].");
+ return printUsage();
+ }
+
+ conf.setOutputKeyClass(ChukwaArchiveKey.class);
+ conf.setOutputValueClass(ChunkImpl.class);
+
+ // FIXME need compression - read config
+ // conf.setCompressMapOutput(true);
+ // conf.setMapOutputCompressorClass(LzoCodec.class);
+
+ //
+
+ FileInputFormat.setInputPaths(conf, args[1]);
+ FileOutputFormat.setOutputPath(conf, new Path(args[2]));
+
+ JobClient.runJob(conf);
+ return 0;
+ }
+
+ public static void main(String[] args) throws Exception {
+ int res = ToolRunner.run(new Configuration(), new ChuckwaArchiveBuilder(),
+ args);
+ System.exit(res);
+ }
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveBuilder.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveBuilder.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveBuilder.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveBuilder.java Wed Mar 11 22:39:26 2009
@@ -18,6 +18,7 @@
package org.apache.hadoop.chukwa.extraction.archive;
+
import org.apache.hadoop.chukwa.ChukwaArchiveKey;
import org.apache.hadoop.chukwa.ChunkImpl;
import org.apache.hadoop.conf.Configuration;
@@ -34,99 +35,82 @@
import org.apache.hadoop.util.ToolRunner;
import org.apache.log4j.Logger;
-public class ChukwaArchiveBuilder extends Configured implements Tool
-{
- static Logger log = Logger.getLogger(ChukwaArchiveBuilder.class);
-
- static int printUsage()
- {
- System.out
- .println("ChuckwaArchiveBuilder <Stream/DataType/Daily/Hourly> <input> <output>");
- ToolRunner.printGenericCommandUsage(System.out);
- return -1;
- }
-
- public int run(String[] args) throws Exception
- {
-
-
- // Make sure there are exactly 3 parameters left.
- if (args.length != 3)
- {
- System.out.println("ERROR: Wrong number of parameters: "
- + args.length + " instead of 3.");
- return printUsage();
- }
-
- JobConf jobConf = new JobConf(getConf(), ChukwaArchiveBuilder.class);
- jobConf.addResource(new Path("conf/chukwa-demux-conf.xml"));
-
- jobConf.setInputFormat(SequenceFileInputFormat.class);
-
- jobConf.setMapperClass(IdentityMapper.class);
- jobConf.setReducerClass(IdentityReducer.class);
-
- if (args[0].equalsIgnoreCase("Daily"))
- {
- jobConf.setPartitionerClass(ChukwaArchiveDailyPartitioner.class);
- jobConf.setOutputFormat(ChukwaArchiveDailyOutputFormat.class);
- jobConf.setJobName("Chukwa-DailyArchiveBuilder");
- }
- else if (args[0].equalsIgnoreCase("Hourly"))
- {
- jobConf.setJobName("Chukwa-HourlyArchiveBuilder");
- jobConf.setPartitionerClass(ChukwaArchiveHourlyPartitioner.class);
- jobConf.setOutputFormat(ChukwaArchiveHourlyOutputFormat.class);
- }
- else if (args[0].equalsIgnoreCase("DataType"))
- {
+public class ChukwaArchiveBuilder extends Configured implements Tool {
+ static Logger log = Logger.getLogger(ChukwaArchiveBuilder.class);
+
+ static int printUsage() {
+ System.out
+ .println("ChuckwaArchiveBuilder <Stream/DataType/Daily/Hourly> <input> <output>");
+ ToolRunner.printGenericCommandUsage(System.out);
+ return -1;
+ }
+
+ public int run(String[] args) throws Exception {
+
+ // Make sure there are exactly 3 parameters left.
+ if (args.length != 3) {
+ System.out.println("ERROR: Wrong number of parameters: " + args.length
+ + " instead of 3.");
+ return printUsage();
+ }
+
+ JobConf jobConf = new JobConf(getConf(), ChukwaArchiveBuilder.class);
+ jobConf.addResource(new Path("conf/chukwa-demux-conf.xml"));
+
+ jobConf.setInputFormat(SequenceFileInputFormat.class);
+
+ jobConf.setMapperClass(IdentityMapper.class);
+ jobConf.setReducerClass(IdentityReducer.class);
+
+ if (args[0].equalsIgnoreCase("Daily")) {
+ jobConf.setPartitionerClass(ChukwaArchiveDailyPartitioner.class);
+ jobConf.setOutputFormat(ChukwaArchiveDailyOutputFormat.class);
+ jobConf.setJobName("Chukwa-DailyArchiveBuilder");
+ } else if (args[0].equalsIgnoreCase("Hourly")) {
+ jobConf.setJobName("Chukwa-HourlyArchiveBuilder");
+ jobConf.setPartitionerClass(ChukwaArchiveHourlyPartitioner.class);
+ jobConf.setOutputFormat(ChukwaArchiveHourlyOutputFormat.class);
+ } else if (args[0].equalsIgnoreCase("DataType")) {
jobConf.setJobName("Chukwa-HourlyArchiveBuilder-DataType");
int reduceCount = jobConf.getInt("chukwaArchiveBuilder.reduceCount", 1);
log.info("Reduce Count:" + reduceCount);
jobConf.setNumReduceTasks(reduceCount);
-
+
jobConf.setPartitionerClass(ChukwaArchiveDataTypePartitioner.class);
- jobConf.setOutputFormat(ChukwaArchiveDataTypeOutputFormat.class);
- }
- else if (args[0].equalsIgnoreCase("Stream"))
- {
+ jobConf.setOutputFormat(ChukwaArchiveDataTypeOutputFormat.class);
+ } else if (args[0].equalsIgnoreCase("Stream")) {
jobConf.setJobName("Chukwa-HourlyArchiveBuilder-Stream");
int reduceCount = jobConf.getInt("chukwaArchiveBuilder.reduceCount", 1);
log.info("Reduce Count:" + reduceCount);
jobConf.setNumReduceTasks(reduceCount);
-
+
jobConf.setPartitionerClass(ChukwaArchiveStreamNamePartitioner.class);
- jobConf.setOutputFormat(ChukwaArchiveStreamNameOutputFormat.class);
+ jobConf.setOutputFormat(ChukwaArchiveStreamNameOutputFormat.class);
+ } else {
+ System.out.println("ERROR: Wrong Time partionning: " + args[0]
+ + " instead of [Stream/DataType/Hourly/Daily].");
+ return printUsage();
}
- else
- {
- System.out.println("ERROR: Wrong Time partionning: "
- + args[0] + " instead of [Stream/DataType/Hourly/Daily].");
- return printUsage();
- }
-
-
- jobConf.set("mapred.compress.map.output", "true");
- jobConf.set("mapred.map.output.compression.codec", "org.apache.hadoop.io.compress.LzoCodec");
- jobConf.set("mapred.output.compress", "true");
- jobConf.set("mapred.output.compression.type", "BLOCK");
-
-
-
- jobConf.setOutputKeyClass(ChukwaArchiveKey.class);
- jobConf.setOutputValueClass(ChunkImpl.class);
-
- FileInputFormat.setInputPaths(jobConf, args[1]);
- FileOutputFormat.setOutputPath(jobConf, new Path(args[2]));
-
- JobClient.runJob(jobConf);
- return 0;
- }
-
- public static void main(String[] args) throws Exception
- {
- int res = ToolRunner.run(new Configuration(),
- new ChukwaArchiveBuilder(), args);
- System.exit(res);
- }
+
+ jobConf.set("mapred.compress.map.output", "true");
+ jobConf.set("mapred.map.output.compression.codec",
+ "org.apache.hadoop.io.compress.LzoCodec");
+ jobConf.set("mapred.output.compress", "true");
+ jobConf.set("mapred.output.compression.type", "BLOCK");
+
+ jobConf.setOutputKeyClass(ChukwaArchiveKey.class);
+ jobConf.setOutputValueClass(ChunkImpl.class);
+
+ FileInputFormat.setInputPaths(jobConf, args[1]);
+ FileOutputFormat.setOutputPath(jobConf, new Path(args[2]));
+
+ JobClient.runJob(jobConf);
+ return 0;
+ }
+
+ public static void main(String[] args) throws Exception {
+ int res = ToolRunner.run(new Configuration(), new ChukwaArchiveBuilder(),
+ args);
+ System.exit(res);
+ }
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveDailyOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveDailyOutputFormat.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveDailyOutputFormat.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveDailyOutputFormat.java Wed Mar 11 22:39:26 2009
@@ -20,26 +20,25 @@
import java.text.SimpleDateFormat;
-
import org.apache.hadoop.chukwa.ChukwaArchiveKey;
import org.apache.hadoop.chukwa.ChunkImpl;
import org.apache.hadoop.mapred.lib.MultipleSequenceFileOutputFormat;
import org.apache.log4j.Logger;
-public class ChukwaArchiveDailyOutputFormat extends MultipleSequenceFileOutputFormat<ChukwaArchiveKey, ChunkImpl>
-{
- static Logger log = Logger.getLogger(ChukwaArchiveDailyOutputFormat.class);
- SimpleDateFormat sdf = new SimpleDateFormat("yyyy_MM_dd");
-
-
- @Override
- protected String generateFileNameForKeyValue(ChukwaArchiveKey key, ChunkImpl chunk,
- String name)
- {
-
- if (log.isDebugEnabled())
- {log.debug("ChukwaArchiveOutputFormat.fileName: " + sdf.format(key.getTimePartition()));}
-
- return sdf.format(key.getTimePartition()) + ".arc";
- }
+public class ChukwaArchiveDailyOutputFormat extends
+ MultipleSequenceFileOutputFormat<ChukwaArchiveKey, ChunkImpl> {
+ static Logger log = Logger.getLogger(ChukwaArchiveDailyOutputFormat.class);
+ SimpleDateFormat sdf = new SimpleDateFormat("yyyy_MM_dd");
+
+ @Override
+ protected String generateFileNameForKeyValue(ChukwaArchiveKey key,
+ ChunkImpl chunk, String name) {
+
+ if (log.isDebugEnabled()) {
+ log.debug("ChukwaArchiveOutputFormat.fileName: "
+ + sdf.format(key.getTimePartition()));
+ }
+
+ return sdf.format(key.getTimePartition()) + ".arc";
+ }
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveDailyPartitioner.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveDailyPartitioner.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveDailyPartitioner.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveDailyPartitioner.java Wed Mar 11 22:39:26 2009
@@ -18,25 +18,25 @@
package org.apache.hadoop.chukwa.extraction.archive;
-import java.text.SimpleDateFormat;
+import java.text.SimpleDateFormat;
import org.apache.hadoop.chukwa.ChukwaArchiveKey;
import org.apache.hadoop.chukwa.ChunkImpl;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Partitioner;
-public class ChukwaArchiveDailyPartitioner<K, V>
- implements Partitioner<ChukwaArchiveKey,ChunkImpl>
-{
- SimpleDateFormat sdf = new SimpleDateFormat("yyyy_MM_dd");
-
- public void configure(JobConf arg0)
- {}
-
- public int getPartition(ChukwaArchiveKey key,ChunkImpl chunl, int numReduceTasks)
- {
-
- return (sdf.format(key.getTimePartition()).hashCode() & Integer.MAX_VALUE) % numReduceTasks;
- }
+public class ChukwaArchiveDailyPartitioner<K, V> implements
+ Partitioner<ChukwaArchiveKey, ChunkImpl> {
+ SimpleDateFormat sdf = new SimpleDateFormat("yyyy_MM_dd");
+
+ public void configure(JobConf arg0) {
+ }
+
+ public int getPartition(ChukwaArchiveKey key, ChunkImpl chunl,
+ int numReduceTasks) {
+
+ return (sdf.format(key.getTimePartition()).hashCode() & Integer.MAX_VALUE)
+ % numReduceTasks;
+ }
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveDataTypeOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveDataTypeOutputFormat.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveDataTypeOutputFormat.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveDataTypeOutputFormat.java Wed Mar 11 22:39:26 2009
@@ -20,26 +20,26 @@
import java.text.SimpleDateFormat;
-
import org.apache.hadoop.chukwa.ChukwaArchiveKey;
import org.apache.hadoop.chukwa.ChunkImpl;
import org.apache.hadoop.mapred.lib.MultipleSequenceFileOutputFormat;
import org.apache.log4j.Logger;
-public class ChukwaArchiveDataTypeOutputFormat extends MultipleSequenceFileOutputFormat<ChukwaArchiveKey, ChunkImpl>
-{
- static Logger log = Logger.getLogger(ChukwaArchiveDataTypeOutputFormat.class);
- SimpleDateFormat sdf = new SimpleDateFormat("yyyy_MM_dd");
-
-
- @Override
- protected String generateFileNameForKeyValue(ChukwaArchiveKey key, ChunkImpl chunk,
- String name)
- {
-
- if (log.isDebugEnabled())
- {log.debug("ChukwaArchiveOutputFormat.fileName: " + sdf.format(key.getTimePartition()));}
-
- return chunk.getDataType() + "_" +sdf.format(key.getTimePartition()) + ".arc";
- }
+public class ChukwaArchiveDataTypeOutputFormat extends
+ MultipleSequenceFileOutputFormat<ChukwaArchiveKey, ChunkImpl> {
+ static Logger log = Logger.getLogger(ChukwaArchiveDataTypeOutputFormat.class);
+ SimpleDateFormat sdf = new SimpleDateFormat("yyyy_MM_dd");
+
+ @Override
+ protected String generateFileNameForKeyValue(ChukwaArchiveKey key,
+ ChunkImpl chunk, String name) {
+
+ if (log.isDebugEnabled()) {
+ log.debug("ChukwaArchiveOutputFormat.fileName: "
+ + sdf.format(key.getTimePartition()));
+ }
+
+ return chunk.getDataType() + "_" + sdf.format(key.getTimePartition())
+ + ".arc";
+ }
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveDataTypePartitioner.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveDataTypePartitioner.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveDataTypePartitioner.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveDataTypePartitioner.java Wed Mar 11 22:39:26 2009
@@ -18,25 +18,26 @@
package org.apache.hadoop.chukwa.extraction.archive;
-import java.text.SimpleDateFormat;
+import java.text.SimpleDateFormat;
import org.apache.hadoop.chukwa.ChukwaArchiveKey;
import org.apache.hadoop.chukwa.ChunkImpl;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Partitioner;
-public class ChukwaArchiveDataTypePartitioner<K, V>
- implements Partitioner<ChukwaArchiveKey,ChunkImpl>
-{
- SimpleDateFormat sdf = new SimpleDateFormat("yyyy_MM_dd");
-
- public void configure(JobConf arg0)
- {}
-
- public int getPartition(ChukwaArchiveKey key,ChunkImpl chunk, int numReduceTasks)
- {
-
- return ( (chunk.getDataType() + "_" +sdf.format(key.getTimePartition() )).hashCode() & Integer.MAX_VALUE) % numReduceTasks;
- }
+public class ChukwaArchiveDataTypePartitioner<K, V> implements
+ Partitioner<ChukwaArchiveKey, ChunkImpl> {
+ SimpleDateFormat sdf = new SimpleDateFormat("yyyy_MM_dd");
+
+ public void configure(JobConf arg0) {
+ }
+
+ public int getPartition(ChukwaArchiveKey key, ChunkImpl chunk,
+ int numReduceTasks) {
+
+ return ((chunk.getDataType() + "_" + sdf.format(key.getTimePartition()))
+ .hashCode() & Integer.MAX_VALUE)
+ % numReduceTasks;
+ }
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveHourlyOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveHourlyOutputFormat.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveHourlyOutputFormat.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveHourlyOutputFormat.java Wed Mar 11 22:39:26 2009
@@ -20,24 +20,24 @@
import java.text.SimpleDateFormat;
-
import org.apache.hadoop.chukwa.ChukwaArchiveKey;
import org.apache.hadoop.chukwa.ChunkImpl;
import org.apache.hadoop.mapred.lib.MultipleSequenceFileOutputFormat;
import org.apache.log4j.Logger;
-public class ChukwaArchiveHourlyOutputFormat extends MultipleSequenceFileOutputFormat<ChukwaArchiveKey, ChunkImpl>
-{
- static Logger log = Logger.getLogger(ChukwaArchiveHourlyOutputFormat.class);
- SimpleDateFormat sdf = new SimpleDateFormat("yyyy_MM_dd_HH-00");
-
- @Override
- protected String generateFileNameForKeyValue(ChukwaArchiveKey key, ChunkImpl chunk,
- String name)
- {
-
- if (log.isDebugEnabled())
- {log.debug("ChukwaArchiveOutputFormat.fileName: " + sdf.format(key.getTimePartition()));}
- return sdf.format(key.getTimePartition()) + ".arc";
- }
+public class ChukwaArchiveHourlyOutputFormat extends
+ MultipleSequenceFileOutputFormat<ChukwaArchiveKey, ChunkImpl> {
+ static Logger log = Logger.getLogger(ChukwaArchiveHourlyOutputFormat.class);
+ SimpleDateFormat sdf = new SimpleDateFormat("yyyy_MM_dd_HH-00");
+
+ @Override
+ protected String generateFileNameForKeyValue(ChukwaArchiveKey key,
+ ChunkImpl chunk, String name) {
+
+ if (log.isDebugEnabled()) {
+ log.debug("ChukwaArchiveOutputFormat.fileName: "
+ + sdf.format(key.getTimePartition()));
+ }
+ return sdf.format(key.getTimePartition()) + ".arc";
+ }
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveHourlyPartitioner.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveHourlyPartitioner.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveHourlyPartitioner.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveHourlyPartitioner.java Wed Mar 11 22:39:26 2009
@@ -18,25 +18,25 @@
package org.apache.hadoop.chukwa.extraction.archive;
-import java.text.SimpleDateFormat;
+import java.text.SimpleDateFormat;
import org.apache.hadoop.chukwa.ChukwaArchiveKey;
import org.apache.hadoop.chukwa.ChunkImpl;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Partitioner;
-public class ChukwaArchiveHourlyPartitioner<K, V>
- implements Partitioner<ChukwaArchiveKey,ChunkImpl>
-{
- SimpleDateFormat sdf = new SimpleDateFormat("yyyy_MM_dd_HH-00");
-
- public void configure(JobConf arg0)
- {}
-
- public int getPartition(ChukwaArchiveKey key,ChunkImpl chunl, int numReduceTasks)
- {
-
- return (sdf.format(key.getTimePartition()).hashCode() & Integer.MAX_VALUE) % numReduceTasks;
- }
+public class ChukwaArchiveHourlyPartitioner<K, V> implements
+ Partitioner<ChukwaArchiveKey, ChunkImpl> {
+ SimpleDateFormat sdf = new SimpleDateFormat("yyyy_MM_dd_HH-00");
+
+ public void configure(JobConf arg0) {
+ }
+
+ public int getPartition(ChukwaArchiveKey key, ChunkImpl chunl,
+ int numReduceTasks) {
+
+ return (sdf.format(key.getTimePartition()).hashCode() & Integer.MAX_VALUE)
+ % numReduceTasks;
+ }
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveMerger.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveMerger.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveMerger.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveMerger.java Wed Mar 11 22:39:26 2009
@@ -1,6 +1,6 @@
package org.apache.hadoop.chukwa.extraction.archive;
-public class ChukwaArchiveMerger
-{
+
+public class ChukwaArchiveMerger {
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveStreamNameOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveStreamNameOutputFormat.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveStreamNameOutputFormat.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveStreamNameOutputFormat.java Wed Mar 11 22:39:26 2009
@@ -23,13 +23,12 @@
import org.apache.hadoop.chukwa.ChunkImpl;
import org.apache.hadoop.mapred.lib.MultipleSequenceFileOutputFormat;
-public class ChukwaArchiveStreamNameOutputFormat extends MultipleSequenceFileOutputFormat<ChukwaArchiveKey, ChunkImpl>
-{
+public class ChukwaArchiveStreamNameOutputFormat extends
+ MultipleSequenceFileOutputFormat<ChukwaArchiveKey, ChunkImpl> {
@Override
- protected String generateLeafFileName(String name)
- {
+ protected String generateLeafFileName(String name) {
return "chukwaArchive-" + super.generateLeafFileName(name);
}
-
+
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveStreamNamePartitioner.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveStreamNamePartitioner.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveStreamNamePartitioner.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveStreamNamePartitioner.java Wed Mar 11 22:39:26 2009
@@ -18,21 +18,22 @@
package org.apache.hadoop.chukwa.extraction.archive;
+
import org.apache.hadoop.chukwa.ChukwaArchiveKey;
import org.apache.hadoop.chukwa.ChunkImpl;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Partitioner;
-public class ChukwaArchiveStreamNamePartitioner<K, V>
- implements Partitioner<ChukwaArchiveKey,ChunkImpl>
-{
- public void configure(JobConf arg0)
- {}
-
- public int getPartition(ChukwaArchiveKey key,ChunkImpl chunk, int numReduceTasks)
- {
-
- return ( (chunk.getSource() + "/" + chunk.getStreamName()).hashCode() & Integer.MAX_VALUE) % numReduceTasks;
- }
+public class ChukwaArchiveStreamNamePartitioner<K, V> implements
+ Partitioner<ChukwaArchiveKey, ChunkImpl> {
+ public void configure(JobConf arg0) {
+ }
+
+ public int getPartition(ChukwaArchiveKey key, ChunkImpl chunk,
+ int numReduceTasks) {
+
+ return ((chunk.getSource() + "/" + chunk.getStreamName()).hashCode() & Integer.MAX_VALUE)
+ % numReduceTasks;
+ }
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/database/DBException.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/database/DBException.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/database/DBException.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/database/DBException.java Wed Mar 11 22:39:26 2009
@@ -1,30 +1,26 @@
package org.apache.hadoop.chukwa.extraction.database;
-public class DBException extends Exception
-{
- /**
+public class DBException extends Exception {
+
+ /**
*
*/
- private static final long serialVersionUID = -4509384580029389936L;
+ private static final long serialVersionUID = -4509384580029389936L;
+
+ public DBException() {
+ }
- public DBException()
- {
- }
-
- public DBException(String message)
- {
- super(message);
- }
-
- public DBException(Throwable cause)
- {
- super(cause);
- }
-
- public DBException(String message, Throwable cause)
- {
- super(message, cause);
- }
+ public DBException(String message) {
+ super(message);
+ }
+
+ public DBException(Throwable cause) {
+ super(cause);
+ }
+
+ public DBException(String message, Throwable cause) {
+ super(message, cause);
+ }
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/database/DBPlugin.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/database/DBPlugin.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/database/DBPlugin.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/database/DBPlugin.java Wed Mar 11 22:39:26 2009
@@ -1,9 +1,8 @@
package org.apache.hadoop.chukwa.extraction.database;
+
import org.apache.hadoop.io.SequenceFile;
-public interface DBPlugin
-{
- void process(SequenceFile.Reader reader)
- throws DBException;
+public interface DBPlugin {
+ void process(SequenceFile.Reader reader) throws DBException;
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/database/DatabaseLoader.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/database/DatabaseLoader.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/database/DatabaseLoader.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/database/DatabaseLoader.java Wed Mar 11 22:39:26 2009
@@ -1,10 +1,10 @@
package org.apache.hadoop.chukwa.extraction.database;
+
import java.io.IOException;
import java.net.URISyntaxException;
import java.sql.SQLException;
import java.util.HashMap;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
@@ -13,125 +13,105 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
-public class DatabaseLoader
-{
+public class DatabaseLoader {
- static HashMap<String, String> hashDatasources = new HashMap<String, String>();
- static ChukwaConfiguration conf = null;
- static FileSystem fs = null;
-
- private static Log log = LogFactory.getLog(DatabaseLoader.class);
-
- /**
- * @param args
- * @throws URISyntaxException
- * @throws IOException
- */
- public static void main(String[] args) throws IOException,
- URISyntaxException
- {
- //FIXME quick implementation to be able to load data into database
-
- System.out.println("Input directory:" + args[0]);
-
-
- for(int i=1;i<args.length;i++)
- {
- hashDatasources.put(args[i], "");
- }
-
- conf = new ChukwaConfiguration();
- fs = FileSystem.get(conf);
- Path demuxDir = new Path(args[0]);
- FileStatus fstat = fs.getFileStatus(demuxDir);
-
- if (!fstat.isDir())
- {
- throw new IOException(args[0] + " is not a directory!");
- }
- else
- {
- // cluster Directory
- FileStatus[] clusterDirectories = fs.listStatus(demuxDir);
- for (FileStatus clusterDirectory : clusterDirectories)
- {
- FileStatus[] datasourceDirectories = fs.listStatus(clusterDirectory.getPath());
-
- String directoryName = null;
- for (FileStatus datasourceDirectory : datasourceDirectories)
- {
- directoryName = datasourceDirectory.getPath().getName();
- if (directoryName.equals("_log") || (!hashDatasources.containsKey(directoryName)))
- {
- log.info("Skipping this directory:" + directoryName );
- continue;
- }
- try
- {
- processDS(clusterDirectory.getPath().getName(),datasourceDirectory.getPath());
- }
- catch(Exception e)
- {
- e.printStackTrace();
- log.warn("Exception in DatabaseLoader:" ,e);
- }
- }
- }
-
- System.exit(0);
- }
- }
-
- static void processDS(String cluster, Path datasourcePath) throws IOException
- {
- Path srcDir = datasourcePath;
- FileStatus fstat = fs.getFileStatus(srcDir);
-
- if (!fstat.isDir())
- {
- throw new IOException(datasourcePath.getName() + " is not a directory!");
- } else
- {
- FileStatus[] datasourceDirectories = fs.listStatus(srcDir,new EventFileFilter());
- for (FileStatus datasourceDirectory : datasourceDirectories)
- {
- String dataSource = datasourceDirectory.getPath().getName();
- dataSource = dataSource.substring(0,dataSource.indexOf('_'));
-
-
- // Need to rename if we want todo some processing in para.
- //
- // Maybe the file has already been processed by another loader
- if (fs.exists(datasourceDirectory.getPath()))
- {
-
- log.info("Processing: " + datasourceDirectory.getPath().getName());
-
- try
- {
- MetricDataLoader mdl = new MetricDataLoader(cluster);
- mdl.process(datasourceDirectory.getPath());
- } catch (SQLException e)
- {
- e.printStackTrace();
- log.warn("SQLException in MetricDataLoader:" ,e);
- } catch (URISyntaxException e)
- {
- e.printStackTrace();
- log.warn("Exception in MetricDataLoader:" ,e);
- }
-
- log.info("Processed: " + datasourceDirectory.getPath().getName());
- }
- } // End for(FileStatus datasourceDirectory :datasourceDirectories)
- } // End Else
- }
+ static HashMap<String, String> hashDatasources = new HashMap<String, String>();
+ static ChukwaConfiguration conf = null;
+ static FileSystem fs = null;
+
+ private static Log log = LogFactory.getLog(DatabaseLoader.class);
+
+ /**
+ * @param args
+ * @throws URISyntaxException
+ * @throws IOException
+ */
+ public static void main(String[] args) throws IOException, URISyntaxException {
+ // FIXME quick implementation to be able to load data into database
+
+ System.out.println("Input directory:" + args[0]);
+
+ for (int i = 1; i < args.length; i++) {
+ hashDatasources.put(args[i], "");
+ }
+
+ conf = new ChukwaConfiguration();
+ fs = FileSystem.get(conf);
+ Path demuxDir = new Path(args[0]);
+ FileStatus fstat = fs.getFileStatus(demuxDir);
+
+ if (!fstat.isDir()) {
+ throw new IOException(args[0] + " is not a directory!");
+ } else {
+ // cluster Directory
+ FileStatus[] clusterDirectories = fs.listStatus(demuxDir);
+ for (FileStatus clusterDirectory : clusterDirectories) {
+ FileStatus[] datasourceDirectories = fs.listStatus(clusterDirectory
+ .getPath());
+
+ String directoryName = null;
+ for (FileStatus datasourceDirectory : datasourceDirectories) {
+ directoryName = datasourceDirectory.getPath().getName();
+ if (directoryName.equals("_log")
+ || (!hashDatasources.containsKey(directoryName))) {
+ log.info("Skipping this directory:" + directoryName);
+ continue;
+ }
+ try {
+ processDS(clusterDirectory.getPath().getName(), datasourceDirectory
+ .getPath());
+ } catch (Exception e) {
+ e.printStackTrace();
+ log.warn("Exception in DatabaseLoader:", e);
+ }
+ }
+ }
+
+ System.exit(0);
+ }
+ }
+
+ static void processDS(String cluster, Path datasourcePath) throws IOException {
+ Path srcDir = datasourcePath;
+ FileStatus fstat = fs.getFileStatus(srcDir);
+
+ if (!fstat.isDir()) {
+ throw new IOException(datasourcePath.getName() + " is not a directory!");
+ } else {
+ FileStatus[] datasourceDirectories = fs.listStatus(srcDir,
+ new EventFileFilter());
+ for (FileStatus datasourceDirectory : datasourceDirectories) {
+ String dataSource = datasourceDirectory.getPath().getName();
+ dataSource = dataSource.substring(0, dataSource.indexOf('_'));
+
+ // Need to rename if we want todo some processing in para.
+ //
+ // Maybe the file has already been processed by another loader
+ if (fs.exists(datasourceDirectory.getPath())) {
+
+ log.info("Processing: " + datasourceDirectory.getPath().getName());
+
+ try {
+ MetricDataLoader mdl = new MetricDataLoader(cluster);
+ mdl.process(datasourceDirectory.getPath());
+ } catch (SQLException e) {
+ e.printStackTrace();
+ log.warn("SQLException in MetricDataLoader:", e);
+ } catch (URISyntaxException e) {
+ e.printStackTrace();
+ log.warn("Exception in MetricDataLoader:", e);
+ }
+
+ log.info("Processed: " + datasourceDirectory.getPath().getName());
+ }
+ } // End for(FileStatus datasourceDirectory :datasourceDirectories)
+ } // End Else
+ }
}
-class EventFileFilter implements PathFilter
-{
- public boolean accept(Path path)
- {
- return (path.toString().endsWith(".evt"));
- }
+
+class EventFileFilter implements PathFilter {
+ public boolean accept(Path path) {
+ return (path.toString().endsWith(".evt"));
+ }
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/database/MRJobCounters.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/database/MRJobCounters.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/database/MRJobCounters.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/database/MRJobCounters.java Wed Mar 11 22:39:26 2009
@@ -1,74 +1,74 @@
package org.apache.hadoop.chukwa.extraction.database;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
import org.apache.hadoop.io.SequenceFile.Reader;
-public class MRJobCounters implements DBPlugin
-{
- private static Log log = LogFactory.getLog(MRJobCounters.class);
-
- static final String[] fields =
- {"FILE_SYSTEMS_HDFS_BYTES_READ","FILE_SYSTEMS_HDFS_BYTES_WRITTEN",
- "FILE_SYSTEMS_LOCAL_BYTES_READ","FILE_SYSTEMS_LOCAL_BYTES_WRITTEN","HodId",
- "JOB_COUNTERS__DATA-LOCAL_MAP_TASKS","JOB_COUNTERS__LAUNCHED_MAP_TASKS",
- "JOB_COUNTERS__LAUNCHED_REDUCE_TASKS","JOB_COUNTERS__RACK-LOCAL_MAP_TASKS",
- "JobId","MAP-REDUCE_FRAMEWORK_COMBINE_INPUT_RECORDS","MAP-REDUCE_FRAMEWORK_COMBINE_OUTPUT_RECORDS",
- "MAP-REDUCE_FRAMEWORK_MAP_INPUT_BYTES","MAP-REDUCE_FRAMEWORK_MAP_INPUT_RECORDS",
- "MAP-REDUCE_FRAMEWORK_MAP_OUTPUT_BYTES","MAP-REDUCE_FRAMEWORK_MAP_OUTPUT_RECORDS",
- "MAP-REDUCE_FRAMEWORK_MAP_OUTPUT_BYTES","MAP-REDUCE_FRAMEWORK_MAP_OUTPUT_RECORDS",
- "MAP-REDUCE_FRAMEWORK_REDUCE_INPUT_GROUPS","MAP-REDUCE_FRAMEWORK_REDUCE_INPUT_RECORDS",
- "MAP-REDUCE_FRAMEWORK_REDUCE_OUTPUT_RECORDS"};
-// [FILE_SYSTEMS_HDFS_BYTES_READ] :801280331655
-// [FILE_SYSTEMS_HDFS_BYTES_WRITTEN] :44142889
-// [FILE_SYSTEMS_LOCAL_BYTES_READ] :1735570776310
-// [FILE_SYSTEMS_LOCAL_BYTES_WRITTEN] :2610893176016
-// [HodId] :0.0
-// [JOB_COUNTERS__DATA-LOCAL_MAP_TASKS] :5545
-// [JOB_COUNTERS__LAUNCHED_MAP_TASKS] :5912
-// [JOB_COUNTERS__LAUNCHED_REDUCE_TASKS] :739
-// [JOB_COUNTERS__RACK-LOCAL_MAP_TASKS] :346
-// [JobId] :2.008042104030008E15
-// [MAP-REDUCE_FRAMEWORK_COMBINE_INPUT_RECORDS] :0
-// [MAP-REDUCE_FRAMEWORK_COMBINE_OUTPUT_RECORDS] :0
-// [MAP-REDUCE_FRAMEWORK_MAP_INPUT_BYTES] :801273929542
-// [MAP-REDUCE_FRAMEWORK_MAP_INPUT_RECORDS] :9406887059
-// [MAP-REDUCE_FRAMEWORK_MAP_OUTPUT_BYTES] :784109666437
-// [MAP-REDUCE_FRAMEWORK_MAP_OUTPUT_RECORDS] :9406887059
-// [MAP-REDUCE_FRAMEWORK_REDUCE_INPUT_GROUPS] :477623
-// [MAP-REDUCE_FRAMEWORK_REDUCE_INPUT_RECORDS] :739000
-// [MAP-REDUCE_FRAMEWORK_REDUCE_OUTPUT_RECORDS] :739000
-
-
- @Override
- public void process(Reader reader) throws DBException
- {
- ChukwaRecordKey key = new ChukwaRecordKey();
- ChukwaRecord record = new ChukwaRecord();
- try
- {
- StringBuilder sb = new StringBuilder();
- while (reader.next(key, record))
- {
-
- sb.append("insert into MRJobCounters ");
- for (String field :fields)
- {
- sb.append(" set ").append(field).append(" = ").append(record.getValue(field)).append(", ");
- }
- sb.append(" set timestamp =").append( record.getTime()).append(";\n");
- }
- System.out.println(sb.toString());
- }
- catch (Exception e)
- {
- log.error("Unable to insert data into database"
- + e.getMessage());
- e.printStackTrace();
- }
+public class MRJobCounters implements DBPlugin {
+ private static Log log = LogFactory.getLog(MRJobCounters.class);
+
+ static final String[] fields = { "FILE_SYSTEMS_HDFS_BYTES_READ",
+ "FILE_SYSTEMS_HDFS_BYTES_WRITTEN", "FILE_SYSTEMS_LOCAL_BYTES_READ",
+ "FILE_SYSTEMS_LOCAL_BYTES_WRITTEN", "HodId",
+ "JOB_COUNTERS__DATA-LOCAL_MAP_TASKS", "JOB_COUNTERS__LAUNCHED_MAP_TASKS",
+ "JOB_COUNTERS__LAUNCHED_REDUCE_TASKS",
+ "JOB_COUNTERS__RACK-LOCAL_MAP_TASKS", "JobId",
+ "MAP-REDUCE_FRAMEWORK_COMBINE_INPUT_RECORDS",
+ "MAP-REDUCE_FRAMEWORK_COMBINE_OUTPUT_RECORDS",
+ "MAP-REDUCE_FRAMEWORK_MAP_INPUT_BYTES",
+ "MAP-REDUCE_FRAMEWORK_MAP_INPUT_RECORDS",
+ "MAP-REDUCE_FRAMEWORK_MAP_OUTPUT_BYTES",
+ "MAP-REDUCE_FRAMEWORK_MAP_OUTPUT_RECORDS",
+ "MAP-REDUCE_FRAMEWORK_MAP_OUTPUT_BYTES",
+ "MAP-REDUCE_FRAMEWORK_MAP_OUTPUT_RECORDS",
+ "MAP-REDUCE_FRAMEWORK_REDUCE_INPUT_GROUPS",
+ "MAP-REDUCE_FRAMEWORK_REDUCE_INPUT_RECORDS",
+ "MAP-REDUCE_FRAMEWORK_REDUCE_OUTPUT_RECORDS" };
+
+ // [FILE_SYSTEMS_HDFS_BYTES_READ] :801280331655
+ // [FILE_SYSTEMS_HDFS_BYTES_WRITTEN] :44142889
+ // [FILE_SYSTEMS_LOCAL_BYTES_READ] :1735570776310
+ // [FILE_SYSTEMS_LOCAL_BYTES_WRITTEN] :2610893176016
+ // [HodId] :0.0
+ // [JOB_COUNTERS__DATA-LOCAL_MAP_TASKS] :5545
+ // [JOB_COUNTERS__LAUNCHED_MAP_TASKS] :5912
+ // [JOB_COUNTERS__LAUNCHED_REDUCE_TASKS] :739
+ // [JOB_COUNTERS__RACK-LOCAL_MAP_TASKS] :346
+ // [JobId] :2.008042104030008E15
+ // [MAP-REDUCE_FRAMEWORK_COMBINE_INPUT_RECORDS] :0
+ // [MAP-REDUCE_FRAMEWORK_COMBINE_OUTPUT_RECORDS] :0
+ // [MAP-REDUCE_FRAMEWORK_MAP_INPUT_BYTES] :801273929542
+ // [MAP-REDUCE_FRAMEWORK_MAP_INPUT_RECORDS] :9406887059
+ // [MAP-REDUCE_FRAMEWORK_MAP_OUTPUT_BYTES] :784109666437
+ // [MAP-REDUCE_FRAMEWORK_MAP_OUTPUT_RECORDS] :9406887059
+ // [MAP-REDUCE_FRAMEWORK_REDUCE_INPUT_GROUPS] :477623
+ // [MAP-REDUCE_FRAMEWORK_REDUCE_INPUT_RECORDS] :739000
+ // [MAP-REDUCE_FRAMEWORK_REDUCE_OUTPUT_RECORDS] :739000
+
+ @Override
+ public void process(Reader reader) throws DBException {
+ ChukwaRecordKey key = new ChukwaRecordKey();
+ ChukwaRecord record = new ChukwaRecord();
+ try {
+ StringBuilder sb = new StringBuilder();
+ while (reader.next(key, record)) {
+
+ sb.append("insert into MRJobCounters ");
+ for (String field : fields) {
+ sb.append(" set ").append(field).append(" = ").append(
+ record.getValue(field)).append(", ");
+ }
+ sb.append(" set timestamp =").append(record.getTime()).append(";\n");
+ }
+ System.out.println(sb.toString());
+ } catch (Exception e) {
+ log.error("Unable to insert data into database" + e.getMessage());
+ e.printStackTrace();
+ }
- }
+ }
}