You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@chukwa.apache.org by as...@apache.org on 2009/05/27 00:14:52 UTC
svn commit: r778911 - in /hadoop/chukwa/trunk/src:
java/org/apache/hadoop/chukwa/datacollection/writer/localfs/
test/org/apache/hadoop/chukwa/datacollection/writer/
Author: asrabkin
Date: Tue May 26 22:14:51 2009
New Revision: 778911
URL: http://svn.apache.org/viewvc?rev=778911&view=rev
Log:
CHUKWA-30. Missed some files.
Added:
hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/localfs/
hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/localfs/LocalToRemoteHdfsMover.java
hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/localfs/LocalWriter.java
hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/writer/TestChukwaWriters.java
Added: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/localfs/LocalToRemoteHdfsMover.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/localfs/LocalToRemoteHdfsMover.java?rev=778911&view=auto
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/localfs/LocalToRemoteHdfsMover.java (added)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/localfs/LocalToRemoteHdfsMover.java Tue May 26 22:14:51 2009
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.datacollection.writer.localfs;
+
+import java.net.URI;
+import java.util.concurrent.BlockingQueue;
+
+import org.apache.hadoop.chukwa.util.DaemonWatcher;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+
+
+/**
+ * This class is used by LocalWriter.java.
+ *
+ * The only role of this class is to move dataSink files
+ * from the local file system to the remote HDFS.
+ *
+ * Those 2 classes are using a blockingQueue to exchange
+ * information.
+ *
+ * This class will also take care of moving all existing
+ * done dataSink files (.done) and any dataSink file that
+ * has not been changed for at least (rotatePeriod+2min).
+ *
+ */
+public class LocalToRemoteHdfsMover extends Thread {
+ static Logger log = Logger.getLogger(LocalToRemoteHdfsMover.class);
+
+ private FileSystem remoteFs = null;
+ private FileSystem localFs = null;
+ private Configuration conf = null;
+ private String fsname = null;
+ private String localOutputDir = null;
+ private String remoteOutputDir = null;
+ private boolean exitIfHDFSNotavailable = false;
+ private BlockingQueue<String> fileQueue = null;
+ private volatile boolean isRunning = true;
+
+ public LocalToRemoteHdfsMover(BlockingQueue<String> fileQueue ,Configuration conf) {
+ this.fileQueue = fileQueue;
+ this.conf = conf;
+ this.setDaemon(true);
+ this.setName("LocalToRemoteHdfsMover");
+ this.start();
+ }
+
+ protected void init() throws Throwable {
+
+ // check if they've told us the file system to use
+ fsname = conf.get("writer.hdfs.filesystem");
+ if (fsname == null || fsname.equals("")) {
+ // otherwise try to get the filesystem from hadoop
+ fsname = conf.get("fs.default.name");
+ }
+
+ if (fsname == null) {
+ log.error("no filesystem name");
+ throw new RuntimeException("no filesystem");
+ }
+
+ log.info("remote fs name is " + fsname);
+ exitIfHDFSNotavailable = conf.getBoolean(
+ "localToRemoteHdfsMover.exitIfHDFSNotavailable", false);
+
+ remoteFs = FileSystem.get(new URI(fsname), conf);
+ if (remoteFs == null && exitIfHDFSNotavailable) {
+ log.error("can't connect to HDFS at " + remoteFs.getUri() + " bail out!");
+ DaemonWatcher.bailout(-1);
+ }
+
+ localFs = FileSystem.getLocal(conf);
+
+ remoteOutputDir = conf.get("chukwaCollector.outputDir", "/chukwa/logs/");
+ if (!remoteOutputDir.endsWith("/")) {
+ remoteOutputDir += "/";
+ }
+
+ localOutputDir = conf.get("chukwaCollector.localOutputDir",
+ "/chukwa/datasink/");
+ if (!localOutputDir.endsWith("/")) {
+ localOutputDir += "/";
+ }
+
+ }
+
+ protected void moveFile(String filePath) throws Exception{
+ String remoteFilePath = filePath.substring(filePath.lastIndexOf("/")+1,filePath.lastIndexOf("."));
+ remoteFilePath = remoteOutputDir + remoteFilePath;
+ try {
+ Path pLocalPath = new Path(filePath);
+ Path pRemoteFilePath = new Path(remoteFilePath + ".chukwa");
+ remoteFs.copyFromLocalFile(false, true, pLocalPath, pRemoteFilePath);
+ Path pFinalRemoteFilePath = new Path(remoteFilePath + ".done");
+ if ( remoteFs.rename(pRemoteFilePath, pFinalRemoteFilePath)) {
+ localFs.delete(pLocalPath,false);
+ log.info("move done deleting from local: " + pLocalPath);
+ } else {
+ throw new RuntimeException("Cannot rename remote file, " + pRemoteFilePath + " to " + pFinalRemoteFilePath);
+ }
+ }catch (Exception e) {
+ log.warn("Cannot copy to the remote HDFS",e);
+ throw e;
+ }
+ }
+
+ protected void cleanup() throws Exception{
+ try {
+ int rotateInterval = conf.getInt("chukwaCollector.rotateInterval",
+ 1000 * 60 * 5);// defaults to 5 minutes
+
+ Path pLocalOutputDir = new Path(localOutputDir);
+ FileStatus[] files = localFs.listStatus(pLocalOutputDir);
+ String fileName = null;
+ for (FileStatus file: files) {
+ fileName = file.getPath().getName();
+ if (fileName.endsWith(".done")) {
+ moveFile(localOutputDir + fileName);
+ } else if (fileName.endsWith(".chukwa")) {
+ long lastPeriod = System.currentTimeMillis() - rotateInterval - (2*60*1000);
+ if (file.getModificationTime() < lastPeriod) {
+ log.info("Moving .chukwa file over, " + localOutputDir + fileName);
+ moveFile(localOutputDir + fileName);
+ }
+ }
+ }
+ }catch (Exception e) {
+ log.warn("Cannot copy to the remote HDFS",e);
+ throw e;
+ }
+ }
+
+ @Override
+ public void run() {
+ boolean inError = true;
+ String filePath = null;
+
+ while (isRunning) {
+ try {
+ if (inError) {
+ init();
+ cleanup();
+ inError = false;
+ }
+
+ if (filePath == null) {
+ filePath = fileQueue.take();
+ }
+ if (filePath == null) {
+ continue;
+ }
+
+ moveFile(filePath);
+ cleanup();
+ filePath = null;
+
+ } catch (Throwable e) {
+ log.warn("Error in LocalToHdfsMover", e);
+ inError = true;
+ try {
+ log.info("Got an exception going to sleep for 60 secs");
+ Thread.sleep(60000);
+ } catch (Throwable e2) {
+ log.warn("Exception while sleeping", e2);
+ }
+ }
+ }
+ log.info(Thread.currentThread().getName() + " is exiting.");
+ }
+
+ public void shutdown() {
+ this.isRunning = false;
+ }
+}
Added: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/localfs/LocalWriter.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/localfs/LocalWriter.java?rev=778911&view=auto
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/localfs/LocalWriter.java (added)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/localfs/LocalWriter.java Tue May 26 22:14:51 2009
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.datacollection.writer.localfs;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Calendar;
+import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.hadoop.chukwa.ChukwaArchiveKey;
+import org.apache.hadoop.chukwa.Chunk;
+import org.apache.hadoop.chukwa.ChunkImpl;
+import org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter;
+import org.apache.hadoop.chukwa.datacollection.writer.WriterException;
+import org.apache.hadoop.chukwa.util.DaemonWatcher;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.log4j.Logger;
+
+/**
+ * <p>This class <b>is</b> thread-safe -- rotate() and save() both synchronize on
+ * lock object.
+ * </p>
+ * <p>
+ * Write data to a local fileSystem then move it to the remote HDFS
+ * <br>
+ * Warning:
+ * <br>
+ * There's no lock/waiting time for the remote client.
+ * The connection is released as soon as the last append is done,
+ * so therefore there is no guarantee that this class will not loose
+ * any data.
+ * <br>
+ * This class has been designed this way for performance reason.
+ * </p>
+ * <p>
+ * In order to use this class, you need to define some parameters,
+ * in chukwa-collector-conf.xml
+ * <p>
+ * <br>
+ * <property><br>
+ * <name>chukwaCollector.localOutputDir</name><br>
+ * <value>/grid/0/gs/chukwa/chukwa-0.1.2/dataSink/</value><br>
+ * <description>Chukwa data sink directory</description><br>
+ * </property><br>
+ *<br>
+ * <property><br>
+ * <name>chukwaCollector.writerClass</name><br>
+ * <value>org.apache.hadoop.chukwa.datacollection.writer.localfs.LocalWriter</value><br>
+ * <description>Local chukwa writer</description><br>
+ * </property><br>
+ * <br>
+ */
+public class LocalWriter implements ChukwaWriter {
+
+ static Logger log = Logger.getLogger(LocalWriter.class);
+ static final int STAT_INTERVAL_SECONDS = 30;
+ static String localHostAddr = null;
+
+ private final Object lock = new Object();
+ private BlockingQueue<String> fileQueue = null;
+ @SuppressWarnings("unused")
+ private LocalToRemoteHdfsMover localToRemoteHdfsMover = null;
+ private FileSystem fs = null;
+ private Configuration conf = null;
+
+ private String localOutputDir = null;
+ private Calendar calendar = Calendar.getInstance();
+
+ private Path currentPath = null;
+ private String currentFileName = null;
+ private FSDataOutputStream currentOutputStr = null;
+ private SequenceFile.Writer seqFileWriter = null;
+ private int rotateInterval = 1000 * 60;
+
+
+ private volatile long dataSize = 0;
+ private volatile boolean isRunning = false;
+
+ private Timer rotateTimer = null;
+ private Timer statTimer = null;
+
+
+ private int initWriteChunkRetries = 10;
+ private int writeChunkRetries = initWriteChunkRetries;
+ private boolean chunksWrittenThisRotate = false;
+
+ private long timePeriod = -1;
+ private long nextTimePeriodComputation = -1;
+
+ static {
+ try {
+ localHostAddr = "_" + InetAddress.getLocalHost().getHostName() + "_";
+ } catch (UnknownHostException e) {
+ localHostAddr = "-NA-";
+ }
+ }
+
+ public void init(Configuration conf) throws WriterException {
+ this.conf = conf;
+
+ try {
+ fs = FileSystem.getLocal(conf);
+ localOutputDir = conf.get("chukwaCollector.localOutputDir",
+ "/chukwa/datasink/");
+ if (!localOutputDir.endsWith("/")) {
+ localOutputDir += "/";
+ }
+ Path pLocalOutputDir = new Path(localOutputDir);
+ if (!fs.exists(pLocalOutputDir)) {
+ boolean exist = fs.mkdirs(pLocalOutputDir);
+ if (!exist) {
+ throw new WriterException("Cannot create local dataSink dir: "
+ + localOutputDir);
+ }
+ } else {
+ FileStatus fsLocalOutputDir = fs.getFileStatus(pLocalOutputDir);
+ if (!fsLocalOutputDir.isDir()) {
+ throw new WriterException("local dataSink dir is not a directory: "
+ + localOutputDir);
+ }
+ }
+ } catch (Throwable e) {
+ log.fatal("Cannot initialize LocalWriter", e);
+ DaemonWatcher.bailout(-1);
+ }
+
+
+
+
+ rotateInterval = conf.getInt("chukwaCollector.rotateInterval",
+ 1000 * 60 * 5);// defaults to 5 minutes
+
+ initWriteChunkRetries = conf
+ .getInt("chukwaCollector.writeChunkRetries", 10);
+ writeChunkRetries = initWriteChunkRetries;
+
+ // check if they've told us the file system to use
+ log.info("rotateInterval is " + rotateInterval);
+ log.info("outputDir is " + localOutputDir);
+ log.info("localFileSystem is " + fs.getUri().toString());
+
+ // Setup everything by rotating
+ rotate();
+
+ rotateTimer = new Timer();
+ rotateTimer.schedule(new RotateTask(), rotateInterval,
+ rotateInterval);
+
+ statTimer = new Timer();
+ statTimer.schedule(new StatReportingTask(), 1000,
+ STAT_INTERVAL_SECONDS * 1000);
+
+ fileQueue = new LinkedBlockingQueue<String>();
+ localToRemoteHdfsMover = new LocalToRemoteHdfsMover(fileQueue, conf);
+
+ }
+
+ private class RotateTask extends TimerTask {
+ public void run() {
+ rotate();
+ };
+ }
+
+ private class StatReportingTask extends TimerTask {
+ private long lastTs = System.currentTimeMillis();
+
+ public void run() {
+
+ long time = System.currentTimeMillis();
+ long currentDs = dataSize;
+ dataSize = 0;
+
+ long interval = time - lastTs;
+ lastTs = time;
+
+ long dataRate = 1000 * currentDs / interval; // kb/sec
+ log.info("stat:datacollection.writer.local.LocalWriter dataSize="
+ + currentDs + " dataRate=" + dataRate);
+ }
+ };
+
+ protected void computeTimePeriod() {
+ synchronized (calendar) {
+ calendar.setTimeInMillis(System.currentTimeMillis());
+ calendar.set(Calendar.MINUTE, 0);
+ calendar.set(Calendar.SECOND, 0);
+ calendar.set(Calendar.MILLISECOND, 0);
+ timePeriod = calendar.getTimeInMillis();
+ calendar.add(Calendar.HOUR, 1);
+ nextTimePeriodComputation = calendar.getTimeInMillis();
+ }
+ }
+
+
+ /**
+ * Best effort, there's no guarantee that chunks
+ * have really been written to disk
+ */
+ public void add(List<Chunk> chunks) throws WriterException {
+ if (!isRunning) {
+ throw new WriterException("Writer not yet ready");
+ }
+ long now = System.currentTimeMillis();
+ if (chunks != null) {
+ try {
+ chunksWrittenThisRotate = true;
+ ChukwaArchiveKey archiveKey = new ChukwaArchiveKey();
+
+ synchronized (lock) {
+ if (System.currentTimeMillis() >= nextTimePeriodComputation) {
+ computeTimePeriod();
+ }
+
+ for (Chunk chunk : chunks) {
+ archiveKey.setTimePartition(timePeriod);
+ archiveKey.setDataType(chunk.getDataType());
+ archiveKey.setStreamName(chunk.getTags() + "/" + chunk.getSource()
+ + "/" + chunk.getStreamName());
+ archiveKey.setSeqId(chunk.getSeqID());
+
+ if (chunk != null) {
+ seqFileWriter.append(archiveKey, chunk);
+ // compute size for stats
+ dataSize += chunk.getData().length;
+ }
+ }
+ }// End synchro
+ long end = System.currentTimeMillis();
+ if (log.isDebugEnabled()) {
+ log.debug("duration=" + (end-now) + " size=" + chunks.size());
+ }
+
+ } catch (IOException e) {
+ writeChunkRetries--;
+ log.error("Could not save the chunk. ", e);
+
+ if (writeChunkRetries < 0) {
+ log
+ .fatal("Too many IOException when trying to write a chunk, Collector is going to exit!");
+ DaemonWatcher.bailout(-1);
+ }
+ throw new WriterException(e);
+ }
+ }
+ }
+
+ protected void rotate() {
+ isRunning = true;
+ calendar.setTimeInMillis(System.currentTimeMillis());
+ log.info("start Date [" + calendar.getTime() + "]");
+ log.info("Rotate from " + Thread.currentThread().getName());
+
+ String newName = new java.text.SimpleDateFormat("yyyyddHHmmssSSS")
+ .format(calendar.getTime());
+ newName += localHostAddr + new java.rmi.server.UID().toString();
+ newName = newName.replace("-", "");
+ newName = newName.replace(":", "");
+ newName = newName.replace(".", "");
+ newName = localOutputDir + "/" + newName.trim();
+
+ synchronized (lock) {
+ try {
+ FSDataOutputStream previousOutputStr = currentOutputStr;
+ Path previousPath = currentPath;
+ String previousFileName = currentFileName;
+
+ if (previousOutputStr != null) {
+ previousOutputStr.close();
+ if (chunksWrittenThisRotate) {
+ fs.rename(previousPath, new Path(previousFileName + ".done"));
+ fileQueue.add(previousFileName + ".done");
+ } else {
+ log.info("no chunks written to " + previousPath + ", deleting");
+ fs.delete(previousPath, false);
+ }
+ }
+ Path newOutputPath = new Path(newName + ".chukwa");
+ FSDataOutputStream newOutputStr = fs.create(newOutputPath);
+
+ currentOutputStr = newOutputStr;
+ currentPath = newOutputPath;
+ currentFileName = newName;
+ chunksWrittenThisRotate = false;
+ // Uncompressed for now
+ seqFileWriter = SequenceFile.createWriter(conf, newOutputStr,
+ ChukwaArchiveKey.class, ChunkImpl.class,
+ SequenceFile.CompressionType.NONE, null);
+
+ } catch (IOException e) {
+ log.fatal("IO Exception in rotate. Exiting!", e);
+ // Shutting down the collector
+ // Watchdog will re-start it automatically
+ DaemonWatcher.bailout(-1);
+ }
+ }
+
+ log.debug("finished rotate()");
+ }
+
+ public void close() {
+ synchronized (lock) {
+
+ if (rotateTimer != null) {
+ rotateTimer.cancel();
+ }
+
+ if (statTimer != null) {
+ statTimer.cancel();
+ }
+
+ try {
+ if (this.currentOutputStr != null) {
+ this.currentOutputStr.close();
+
+ if (seqFileWriter != null) {
+ seqFileWriter.close();
+ }
+ }
+ if (localToRemoteHdfsMover != null) {
+ localToRemoteHdfsMover.shutdown();
+ }
+
+ fs.rename(currentPath, new Path(currentFileName + ".done"));
+ } catch (IOException e) {
+ log.error("failed to close and rename stream", e);
+ }
+ }
+ }
+}
Added: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/writer/TestChukwaWriters.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/writer/TestChukwaWriters.java?rev=778911&view=auto
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/writer/TestChukwaWriters.java (added)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/writer/TestChukwaWriters.java Tue May 26 22:14:51 2009
@@ -0,0 +1,160 @@
+package org.apache.hadoop.chukwa.datacollection.writer;
+
+import java.io.File;
+import java.util.LinkedList;
+import java.util.List;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+import org.apache.hadoop.chukwa.ChukwaArchiveKey;
+import org.apache.hadoop.chukwa.Chunk;
+import org.apache.hadoop.chukwa.ChunkBuilder;
+import org.apache.hadoop.chukwa.ChunkImpl;
+import org.apache.hadoop.chukwa.datacollection.writer.localfs.LocalWriter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+
+
+
+
+public class TestChukwaWriters extends TestCase{
+
+ public void testWriters() {
+ try {
+
+ Configuration conf = new Configuration();
+ FileSystem fs = FileSystem.getLocal(conf);
+
+
+ ChukwaWriter seqWriter = new SeqFileWriter();
+ ChukwaWriter localWriter = new LocalWriter();
+
+ List<Chunk> chunksSeqWriter = new LinkedList<Chunk>();
+ List<Chunk> chunksLocalWriter = new LinkedList<Chunk>();
+ for(int i=0;i<10;i++) {
+ ChunkBuilder cb1 = new ChunkBuilder();
+ cb1.addRecord(("record-" +i) .getBytes());
+ cb1.addRecord("foo" .getBytes());
+ cb1.addRecord("bar".getBytes());
+ cb1.addRecord("baz".getBytes());
+ chunksSeqWriter.add(cb1.getChunk());
+
+ ChunkBuilder cb2 = new ChunkBuilder();
+ cb2.addRecord(("record-" +i) .getBytes());
+ cb2.addRecord("foo" .getBytes());
+ cb2.addRecord("bar".getBytes());
+ cb2.addRecord("baz".getBytes());
+ chunksLocalWriter.add(cb2.getChunk());
+
+ }
+
+ File tempDir = new File(System.getProperty("test.build.data", "/tmp"));
+ if (!tempDir.exists()) {
+ tempDir.mkdirs();
+ }
+
+ String outputDirectory = tempDir.getPath() + "/testChukwaWriters_JB_" + System.currentTimeMillis() + "/";
+
+
+ Configuration confSeqWriter = new Configuration();
+ confSeqWriter.set("chukwaCollector.rotateInterval", "300000");
+ confSeqWriter.set("writer.hdfs.filesystem", "file:///");
+ String seqWriterOutputDir = outputDirectory +"/seqWriter/seqOutputDir";
+ confSeqWriter.set("chukwaCollector.outputDir", seqWriterOutputDir );
+
+ seqWriter.init(confSeqWriter);
+ Thread.sleep(5000);
+ seqWriter.add(chunksSeqWriter);
+ seqWriter.close();
+
+ String seqWriterFile = null;
+
+ File directory = new File(seqWriterOutputDir);
+ String[] files = directory.list();
+ for(String file: files) {
+ if ( file.endsWith(".done") ){
+ seqWriterFile = seqWriterOutputDir + File.separator + file;
+ break;
+ }
+ }
+
+ Assert.assertFalse(seqWriterFile == null);
+
+ String seqWriterDump = dumpArachive(fs,conf,seqWriterFile);
+
+ Configuration confLocalWriter = new Configuration();
+ confSeqWriter.set("writer.hdfs.filesystem", "file:///");
+ String localWriterOutputDir = outputDirectory +"/localWriter/localOutputDir";
+ confLocalWriter.set("chukwaCollector.localOutputDir",localWriterOutputDir);
+ confLocalWriter.set("chukwaCollector.rotateInterval", "300000");
+
+
+ String localWriterFile = null;
+ localWriter.init(confLocalWriter);
+ Thread.sleep(5000);
+ localWriter.add(chunksLocalWriter);
+ localWriter.close();
+
+ directory = new File(localWriterOutputDir);
+ files = directory.list();
+ for(String file: files) {
+ if ( file.endsWith(".done") ){
+ localWriterFile = localWriterOutputDir + File.separator + file;
+ break;
+ }
+ }
+
+ Assert.assertFalse(localWriterFile == null);
+ String localWriterDump = dumpArachive(fs,conf,localWriterFile);
+
+ Assert.assertTrue(seqWriterDump.intern() == localWriterDump.intern());
+
+ File fOutputDirectory = new File(outputDirectory);
+ fOutputDirectory.delete();
+ } catch (Throwable e) {
+ e.printStackTrace();
+ Assert.fail("Exception in TestChukwaWriters," + e.getMessage());
+ }
+
+ }
+
+ protected String dumpArachive(FileSystem fs,Configuration conf, String file) throws Throwable {
+ SequenceFile.Reader reader = null;
+ try {
+ reader = new SequenceFile.Reader(fs, new Path(file), conf);
+
+ ChukwaArchiveKey key = new ChukwaArchiveKey();
+ ChunkImpl chunk = ChunkImpl.getBlankChunk();
+
+ StringBuilder sb = new StringBuilder();
+ while (reader.next(key, chunk)) {
+ sb.append("\nTimePartition: " + key.getTimePartition());
+ sb.append("DataType: " + key.getDataType());
+ sb.append("StreamName: " + key.getStreamName());
+ sb.append("SeqId: " + key.getSeqId());
+ sb.append("\t\t =============== ");
+
+ sb.append("Cluster : " + chunk.getTags());
+ sb.append("DataType : " + chunk.getDataType());
+ sb.append("Source : " + chunk.getSource());
+ sb.append("Application : " + chunk.getApplication());
+ sb.append("SeqID : " + chunk.getSeqID());
+ sb.append("Data : " + new String(chunk.getData()));
+ return sb.toString();
+ }
+ } catch (Throwable e) {
+ Assert.fail("Exception while reading SeqFile"+ e.getMessage());
+ throw e;
+ }
+
+ finally {
+ if (reader != null) {
+ reader.close();
+ }
+ }
+ return null;
+ }
+}