You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@chukwa.apache.org by as...@apache.org on 2009/05/27 00:14:52 UTC

svn commit: r778911 - in /hadoop/chukwa/trunk/src: java/org/apache/hadoop/chukwa/datacollection/writer/localfs/ test/org/apache/hadoop/chukwa/datacollection/writer/

Author: asrabkin
Date: Tue May 26 22:14:51 2009
New Revision: 778911

URL: http://svn.apache.org/viewvc?rev=778911&view=rev
Log:
CHUKWA-30. Missed some files.

Added:
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/localfs/
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/localfs/LocalToRemoteHdfsMover.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/localfs/LocalWriter.java
    hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/writer/TestChukwaWriters.java

Added: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/localfs/LocalToRemoteHdfsMover.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/localfs/LocalToRemoteHdfsMover.java?rev=778911&view=auto
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/localfs/LocalToRemoteHdfsMover.java (added)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/localfs/LocalToRemoteHdfsMover.java Tue May 26 22:14:51 2009
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.datacollection.writer.localfs;
+
+import java.net.URI;
+import java.util.concurrent.BlockingQueue;
+
+import org.apache.hadoop.chukwa.util.DaemonWatcher;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+
+
+/**
+ * This class is used by LocalWriter.java.
+ * 
+ * The only role of this class is to move dataSink files
+ * from the local file system to the remote HDFS.
+ * 
+ * Those 2 classes are using a blockingQueue to exchange 
+ * information.
+ * 
+ * This class will also take care of moving all existing 
+ * done dataSink files (.done) and any dataSink file that
+ * has not been changed for at least (rotatePeriod+2min).
+ * 
+ */
+public class LocalToRemoteHdfsMover extends Thread {
+  static Logger log = Logger.getLogger(LocalToRemoteHdfsMover.class);
+
+  private FileSystem remoteFs = null;
+  private FileSystem localFs = null;
+  private Configuration conf = null;
+  private String fsname = null;
+  private String localOutputDir = null;
+  private String remoteOutputDir = null;
+  private boolean exitIfHDFSNotavailable = false;
+  private BlockingQueue<String> fileQueue = null;
+  private volatile boolean isRunning = true;
+  
+  public LocalToRemoteHdfsMover(BlockingQueue<String> fileQueue ,Configuration conf) {
+    this.fileQueue = fileQueue;
+    this.conf = conf;
+    this.setDaemon(true);
+    this.setName("LocalToRemoteHdfsMover");
+    this.start();
+  }
+
+  protected void init() throws Throwable {
+
+    // check if they've told us the file system to use
+    fsname = conf.get("writer.hdfs.filesystem");
+    if (fsname == null || fsname.equals("")) {
+      // otherwise try to get the filesystem from hadoop
+      fsname = conf.get("fs.default.name");
+    }
+
+    if (fsname == null) {
+      log.error("no filesystem name");
+      throw new RuntimeException("no filesystem");
+    }
+
+    log.info("remote fs name is " + fsname);
+    exitIfHDFSNotavailable = conf.getBoolean(
+        "localToRemoteHdfsMover.exitIfHDFSNotavailable", false);
+
+    remoteFs = FileSystem.get(new URI(fsname), conf);
+    if (remoteFs == null && exitIfHDFSNotavailable) {
+      log.error("can't connect to HDFS at " + remoteFs.getUri() + " bail out!");
+      DaemonWatcher.bailout(-1);
+    } 
+    
+    localFs = FileSystem.getLocal(conf);
+    
+    remoteOutputDir = conf.get("chukwaCollector.outputDir", "/chukwa/logs/");
+    if (!remoteOutputDir.endsWith("/")) {
+      remoteOutputDir += "/";
+    }
+    
+    localOutputDir = conf.get("chukwaCollector.localOutputDir",
+    "/chukwa/datasink/");
+    if (!localOutputDir.endsWith("/")) {
+      localOutputDir += "/";
+    }
+    
+  }
+
+  protected void moveFile(String filePath) throws Exception{
+    String remoteFilePath = filePath.substring(filePath.lastIndexOf("/")+1,filePath.lastIndexOf("."));
+    remoteFilePath = remoteOutputDir + remoteFilePath;
+    try {
+      Path pLocalPath = new Path(filePath);
+      Path pRemoteFilePath = new Path(remoteFilePath + ".chukwa");
+      remoteFs.copyFromLocalFile(false, true, pLocalPath, pRemoteFilePath);
+      Path pFinalRemoteFilePath = new Path(remoteFilePath + ".done");
+      if ( remoteFs.rename(pRemoteFilePath, pFinalRemoteFilePath)) {
+        localFs.delete(pLocalPath,false);
+        log.info("move done deleting from local: " + pLocalPath);
+      } else {
+        throw new RuntimeException("Cannot rename remote file, " + pRemoteFilePath + " to " + pFinalRemoteFilePath);
+      }
+    }catch (Exception e) {
+      log.warn("Cannot copy to the remote HDFS",e);
+      throw e;
+    }
+  }
+  
+  protected void cleanup() throws Exception{
+    try {
+      int rotateInterval = conf.getInt("chukwaCollector.rotateInterval",
+          1000 * 60 * 5);// defaults to 5 minutes
+      
+      Path pLocalOutputDir = new Path(localOutputDir);
+      FileStatus[] files = localFs.listStatus(pLocalOutputDir);
+      String fileName = null;
+      for (FileStatus file: files) {
+        fileName = file.getPath().getName();
+        if (fileName.endsWith(".done")) {
+          moveFile(localOutputDir + fileName);
+        } else if (fileName.endsWith(".chukwa")) {
+          long lastPeriod = System.currentTimeMillis() - rotateInterval - (2*60*1000);
+          if (file.getModificationTime() < lastPeriod) {
+            log.info("Moving .chukwa file over, " + localOutputDir + fileName);
+            moveFile(localOutputDir + fileName);
+          }
+        }
+      }
+    }catch (Exception e) {
+      log.warn("Cannot copy to the remote HDFS",e);
+      throw e;
+    }
+  }
+  
+  @Override
+  public void run() {
+    boolean inError = true;
+    String filePath = null;
+    
+    while (isRunning) {
+      try {
+        if (inError) {
+          init();
+          cleanup();
+          inError = false;
+        }
+        
+        if (filePath == null) {
+          filePath = fileQueue.take();
+        }
+        if (filePath == null) {
+          continue;
+        }
+        
+        moveFile(filePath);
+        cleanup();
+        filePath = null;
+        
+      } catch (Throwable e) {
+        log.warn("Error in LocalToHdfsMover", e);
+        inError = true;
+        try {
+          log.info("Got an exception going to sleep for 60 secs");
+          Thread.sleep(60000);
+        } catch (Throwable e2) {
+          log.warn("Exception while sleeping", e2);
+        }
+      }
+    }
+    log.info(Thread.currentThread().getName() + " is exiting.");
+  }
+
+  public void shutdown() {
+    this.isRunning = false;
+  }
+}

Added: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/localfs/LocalWriter.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/localfs/LocalWriter.java?rev=778911&view=auto
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/localfs/LocalWriter.java (added)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/localfs/LocalWriter.java Tue May 26 22:14:51 2009
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.datacollection.writer.localfs;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Calendar;
+import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.hadoop.chukwa.ChukwaArchiveKey;
+import org.apache.hadoop.chukwa.Chunk;
+import org.apache.hadoop.chukwa.ChunkImpl;
+import org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter;
+import org.apache.hadoop.chukwa.datacollection.writer.WriterException;
+import org.apache.hadoop.chukwa.util.DaemonWatcher;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.log4j.Logger;
+
+/**
+ * <p>This class <b>is</b> thread-safe -- rotate() and save() both synchronize on
+ * lock object.
+ * </p>
+ * <p>
+ * Write data to a local fileSystem then move it to the remote HDFS
+ * <br>
+ * Warning:
+ * <br>
+ * There's no lock/waiting time for the remote client.
+ * The connection is released as soon as the last append is done,
+ * so therefore there is no guarantee that this class will not loose 
+ * any data.
+ * <br>
+ * This class has been designed this way for performance reason.
+ * </p>
+ * <p>
+ * In order to use this class, you need to define some parameters,
+ * in chukwa-collector-conf.xml
+ * <p>
+ * <br>
+ *  &lt;property&gt;<br>
+ *   &lt;name&gt;chukwaCollector.localOutputDir&lt;/name&gt;<br>
+ *   &lt;value&gt;/grid/0/gs/chukwa/chukwa-0.1.2/dataSink/&lt;/value&gt;<br>
+ *   &lt;description&gt;Chukwa data sink directory&lt;/description&gt;<br>
+ *  &lt;/property&gt;<br>
+ *<br>
+ *  &lt;property&gt;<br>
+ *    &lt;name&gt;chukwaCollector.writerClass&lt;/name&gt;<br>
+ *    &lt;value&gt;org.apache.hadoop.chukwa.datacollection.writer.localfs.LocalWriter&lt;/value&gt;<br>
+ *    &lt;description&gt;Local chukwa writer&lt;/description&gt;<br>
+ *  &lt;/property&gt;<br>
+ * <br>
+ */
+public class LocalWriter implements ChukwaWriter {
+
+  static Logger log = Logger.getLogger(LocalWriter.class);
+  static final int STAT_INTERVAL_SECONDS = 30;
+  static String localHostAddr = null;
+
+  private final Object lock = new Object();
+  private BlockingQueue<String> fileQueue = null;
+  @SuppressWarnings("unused")
+  private LocalToRemoteHdfsMover localToRemoteHdfsMover = null;
+  private FileSystem fs = null;
+  private Configuration conf = null;
+
+  private String localOutputDir = null;
+  private Calendar calendar = Calendar.getInstance();
+
+  private Path currentPath = null;
+  private String currentFileName = null;
+  private FSDataOutputStream currentOutputStr = null;
+  private SequenceFile.Writer seqFileWriter = null;
+  private int rotateInterval = 1000 * 60;
+
+ 
+  private volatile long dataSize = 0;
+  private volatile boolean isRunning = false;
+  
+  private Timer rotateTimer = null;
+  private Timer statTimer = null;
+  
+  
+  private int initWriteChunkRetries = 10;
+  private int writeChunkRetries = initWriteChunkRetries;
+  private boolean chunksWrittenThisRotate = false;
+
+  private long timePeriod = -1;
+  private long nextTimePeriodComputation = -1;
+
+  static {
+    try {
+      localHostAddr = "_" + InetAddress.getLocalHost().getHostName() + "_";
+    } catch (UnknownHostException e) {
+      localHostAddr = "-NA-";
+    }
+  }
+
+  public void init(Configuration conf) throws WriterException {
+    this.conf = conf;
+
+    try {
+      fs = FileSystem.getLocal(conf);
+      localOutputDir = conf.get("chukwaCollector.localOutputDir",
+          "/chukwa/datasink/");
+      if (!localOutputDir.endsWith("/")) {
+        localOutputDir += "/";
+      }
+      Path pLocalOutputDir = new Path(localOutputDir);
+      if (!fs.exists(pLocalOutputDir)) {
+        boolean exist = fs.mkdirs(pLocalOutputDir);
+        if (!exist) {
+          throw new WriterException("Cannot create local dataSink dir: "
+              + localOutputDir);
+        }
+      } else {
+        FileStatus fsLocalOutputDir = fs.getFileStatus(pLocalOutputDir);
+        if (!fsLocalOutputDir.isDir()) {
+          throw new WriterException("local dataSink dir is not a directory: "
+              + localOutputDir);
+        }
+      }
+    } catch (Throwable e) {
+      log.fatal("Cannot initialize LocalWriter", e);
+      DaemonWatcher.bailout(-1);
+    }
+
+    
+    
+    
+    rotateInterval = conf.getInt("chukwaCollector.rotateInterval",
+        1000 * 60 * 5);// defaults to 5 minutes
+   
+    initWriteChunkRetries = conf
+        .getInt("chukwaCollector.writeChunkRetries", 10);
+    writeChunkRetries = initWriteChunkRetries;
+
+    // check if they've told us the file system to use
+    log.info("rotateInterval is " + rotateInterval);
+    log.info("outputDir is " + localOutputDir);
+    log.info("localFileSystem is " + fs.getUri().toString());
+
+    // Setup everything by rotating
+    rotate();
+
+    rotateTimer = new Timer();
+    rotateTimer.schedule(new RotateTask(), rotateInterval,
+        rotateInterval);
+    
+    statTimer = new Timer();
+    statTimer.schedule(new StatReportingTask(), 1000,
+        STAT_INTERVAL_SECONDS * 1000);
+
+    fileQueue = new LinkedBlockingQueue<String>();
+    localToRemoteHdfsMover = new LocalToRemoteHdfsMover(fileQueue, conf);
+    
+  }
+
+  private class RotateTask extends TimerTask {
+        public void run() {
+          rotate();
+      };
+  }
+  
+  private class StatReportingTask extends TimerTask {
+    private long lastTs = System.currentTimeMillis();
+
+    public void run() {
+
+      long time = System.currentTimeMillis();
+      long currentDs = dataSize;
+      dataSize = 0;
+
+      long interval = time - lastTs;
+      lastTs = time;
+
+      long dataRate = 1000 * currentDs / interval; // kb/sec
+      log.info("stat:datacollection.writer.local.LocalWriter dataSize="
+          + currentDs + " dataRate=" + dataRate);
+    }
+  };
+
+  protected void computeTimePeriod() {
+    synchronized (calendar) {
+      calendar.setTimeInMillis(System.currentTimeMillis());
+      calendar.set(Calendar.MINUTE, 0);
+      calendar.set(Calendar.SECOND, 0);
+      calendar.set(Calendar.MILLISECOND, 0);
+      timePeriod = calendar.getTimeInMillis();
+      calendar.add(Calendar.HOUR, 1);
+      nextTimePeriodComputation = calendar.getTimeInMillis();
+    }
+  }
+
+
+  /**
+   *  Best effort, there's no guarantee that chunks 
+   *  have really been written to disk
+   */
+  public void add(List<Chunk> chunks) throws WriterException {
+    if (!isRunning) {
+      throw new WriterException("Writer not yet ready");
+    }
+    long now = System.currentTimeMillis();
+    if (chunks != null) {
+      try {
+        chunksWrittenThisRotate = true;
+        ChukwaArchiveKey archiveKey = new ChukwaArchiveKey();
+
+        synchronized (lock) {
+          if (System.currentTimeMillis() >= nextTimePeriodComputation) {
+            computeTimePeriod();
+          }
+
+          for (Chunk chunk : chunks) {
+            archiveKey.setTimePartition(timePeriod);
+            archiveKey.setDataType(chunk.getDataType());
+            archiveKey.setStreamName(chunk.getTags() + "/" + chunk.getSource()
+                + "/" + chunk.getStreamName());
+            archiveKey.setSeqId(chunk.getSeqID());
+
+            if (chunk != null) {
+              seqFileWriter.append(archiveKey, chunk);
+              // compute size for stats
+              dataSize += chunk.getData().length;
+            }
+          }
+        }// End synchro
+        long end = System.currentTimeMillis();
+        if (log.isDebugEnabled()) {
+          log.debug("duration=" + (end-now) + " size=" + chunks.size());
+        }
+        
+      } catch (IOException e) {
+        writeChunkRetries--;
+        log.error("Could not save the chunk. ", e);
+
+        if (writeChunkRetries < 0) {
+          log
+              .fatal("Too many IOException when trying to write a chunk, Collector is going to exit!");
+          DaemonWatcher.bailout(-1);
+        }
+        throw new WriterException(e);
+      }
+    }
+  }
+
+  protected void rotate() {
+    isRunning = true;
+    calendar.setTimeInMillis(System.currentTimeMillis());
+    log.info("start Date [" + calendar.getTime() + "]");
+    log.info("Rotate from " + Thread.currentThread().getName());
+
+    String newName = new java.text.SimpleDateFormat("yyyyddHHmmssSSS")
+        .format(calendar.getTime());
+    newName += localHostAddr + new java.rmi.server.UID().toString();
+    newName = newName.replace("-", "");
+    newName = newName.replace(":", "");
+    newName = newName.replace(".", "");
+    newName = localOutputDir + "/" + newName.trim();
+
+    synchronized (lock) {
+      try {
+        FSDataOutputStream previousOutputStr = currentOutputStr;
+        Path previousPath = currentPath;
+        String previousFileName = currentFileName;
+
+        if (previousOutputStr != null) {
+          previousOutputStr.close();
+          if (chunksWrittenThisRotate) {
+            fs.rename(previousPath, new Path(previousFileName + ".done"));
+            fileQueue.add(previousFileName + ".done");
+          } else {
+            log.info("no chunks written to " + previousPath + ", deleting");
+            fs.delete(previousPath, false);
+          }
+        }
+        Path newOutputPath = new Path(newName + ".chukwa");
+        FSDataOutputStream newOutputStr = fs.create(newOutputPath);
+        
+        currentOutputStr = newOutputStr;
+        currentPath = newOutputPath;
+        currentFileName = newName;
+        chunksWrittenThisRotate = false;
+        // Uncompressed for now
+        seqFileWriter = SequenceFile.createWriter(conf, newOutputStr,
+            ChukwaArchiveKey.class, ChunkImpl.class,
+            SequenceFile.CompressionType.NONE, null);
+
+      } catch (IOException e) {
+        log.fatal("IO Exception in rotate. Exiting!", e);
+        // Shutting down the collector
+        // Watchdog will re-start it automatically
+        DaemonWatcher.bailout(-1);
+      }
+    }
+ 
+    log.debug("finished rotate()");
+  }
+
+  public void close() {
+    synchronized (lock) {
+  
+      if (rotateTimer != null) {
+        rotateTimer.cancel();
+      }
+
+      if (statTimer != null) {
+        statTimer.cancel();
+      }
+
+      try {
+        if (this.currentOutputStr != null) {
+          this.currentOutputStr.close();
+
+          if (seqFileWriter != null) {
+            seqFileWriter.close();
+          }
+        }
+        if (localToRemoteHdfsMover != null) {
+          localToRemoteHdfsMover.shutdown();
+        }
+        
+        fs.rename(currentPath, new Path(currentFileName + ".done"));
+      } catch (IOException e) {
+        log.error("failed to close and rename stream", e);
+      }
+    }
+  }
+}

Added: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/writer/TestChukwaWriters.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/writer/TestChukwaWriters.java?rev=778911&view=auto
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/writer/TestChukwaWriters.java (added)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/writer/TestChukwaWriters.java Tue May 26 22:14:51 2009
@@ -0,0 +1,160 @@
+package org.apache.hadoop.chukwa.datacollection.writer;
+
+import java.io.File;
+import java.util.LinkedList;
+import java.util.List;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+import org.apache.hadoop.chukwa.ChukwaArchiveKey;
+import org.apache.hadoop.chukwa.Chunk;
+import org.apache.hadoop.chukwa.ChunkBuilder;
+import org.apache.hadoop.chukwa.ChunkImpl;
+import org.apache.hadoop.chukwa.datacollection.writer.localfs.LocalWriter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+
+
+
+
+public class TestChukwaWriters extends TestCase{
+
+  public void testWriters() {
+    try {
+      
+      Configuration conf = new Configuration();
+      FileSystem fs = FileSystem.getLocal(conf);
+
+      
+      ChukwaWriter seqWriter = new SeqFileWriter();
+      ChukwaWriter localWriter = new LocalWriter();
+      
+      List<Chunk> chunksSeqWriter = new LinkedList<Chunk>();
+      List<Chunk> chunksLocalWriter = new LinkedList<Chunk>();
+      for(int i=0;i<10;i++) {
+        ChunkBuilder cb1 = new ChunkBuilder();
+        cb1.addRecord(("record-" +i) .getBytes());
+        cb1.addRecord("foo" .getBytes());
+        cb1.addRecord("bar".getBytes());
+        cb1.addRecord("baz".getBytes());
+        chunksSeqWriter.add(cb1.getChunk());
+        
+        ChunkBuilder cb2 = new ChunkBuilder();
+        cb2.addRecord(("record-" +i) .getBytes());
+        cb2.addRecord("foo" .getBytes());
+        cb2.addRecord("bar".getBytes());
+        cb2.addRecord("baz".getBytes());
+        chunksLocalWriter.add(cb2.getChunk());
+        
+      }
+      
+      File tempDir = new File(System.getProperty("test.build.data", "/tmp"));
+      if (!tempDir.exists()) {
+        tempDir.mkdirs();
+      }
+      
+      String outputDirectory = tempDir.getPath() + "/testChukwaWriters_JB_" + System.currentTimeMillis() + "/";
+      
+      
+      Configuration confSeqWriter = new Configuration();
+      confSeqWriter.set("chukwaCollector.rotateInterval", "300000");
+      confSeqWriter.set("writer.hdfs.filesystem", "file:///");
+      String seqWriterOutputDir = outputDirectory +"/seqWriter/seqOutputDir";
+      confSeqWriter.set("chukwaCollector.outputDir", seqWriterOutputDir );
+      
+      seqWriter.init(confSeqWriter);
+      Thread.sleep(5000);
+      seqWriter.add(chunksSeqWriter);
+      seqWriter.close();
+      
+      String seqWriterFile = null;
+      
+      File directory = new File(seqWriterOutputDir);
+      String[] files = directory.list();
+      for(String file: files) {
+        if ( file.endsWith(".done") ){
+          seqWriterFile = seqWriterOutputDir + File.separator + file;
+          break;
+        }
+      }
+      
+      Assert.assertFalse(seqWriterFile == null);
+      
+      String seqWriterDump = dumpArachive(fs,conf,seqWriterFile);
+      
+      Configuration confLocalWriter = new Configuration();
+      confSeqWriter.set("writer.hdfs.filesystem", "file:///");
+      String localWriterOutputDir = outputDirectory +"/localWriter/localOutputDir";
+      confLocalWriter.set("chukwaCollector.localOutputDir",localWriterOutputDir);
+      confLocalWriter.set("chukwaCollector.rotateInterval", "300000");
+
+      
+      String localWriterFile = null;
+      localWriter.init(confLocalWriter);
+      Thread.sleep(5000);
+      localWriter.add(chunksLocalWriter);
+      localWriter.close();
+
+      directory = new File(localWriterOutputDir);
+      files = directory.list();
+      for(String file: files) {
+        if ( file.endsWith(".done") ){
+          localWriterFile = localWriterOutputDir + File.separator + file;
+          break;
+        }
+      }
+      
+      Assert.assertFalse(localWriterFile == null);
+      String localWriterDump = dumpArachive(fs,conf,localWriterFile);
+
+      Assert.assertTrue(seqWriterDump.intern() == localWriterDump.intern());
+
+      File fOutputDirectory = new File(outputDirectory);
+      fOutputDirectory.delete();
+    } catch (Throwable e) {
+      e.printStackTrace();
+      Assert.fail("Exception in TestChukwaWriters," + e.getMessage());
+    }
+    
+  }
+  
+  protected String dumpArachive(FileSystem fs,Configuration conf, String file) throws Throwable {
+    SequenceFile.Reader reader = null;
+    try {
+      reader = new SequenceFile.Reader(fs, new Path(file), conf);
+
+      ChukwaArchiveKey key = new ChukwaArchiveKey();
+      ChunkImpl chunk = ChunkImpl.getBlankChunk();
+
+      StringBuilder sb = new StringBuilder();
+      while (reader.next(key, chunk)) {
+        sb.append("\nTimePartition: " + key.getTimePartition());
+        sb.append("DataType: " + key.getDataType());
+        sb.append("StreamName: " + key.getStreamName());
+        sb.append("SeqId: " + key.getSeqId());
+        sb.append("\t\t =============== ");
+
+        sb.append("Cluster : " + chunk.getTags());
+        sb.append("DataType : " + chunk.getDataType());
+        sb.append("Source : " + chunk.getSource());
+        sb.append("Application : " + chunk.getApplication());
+        sb.append("SeqID : " + chunk.getSeqID());
+        sb.append("Data : " + new String(chunk.getData()));
+        return sb.toString();
+      }
+    } catch (Throwable e) {
+     Assert.fail("Exception while reading SeqFile"+ e.getMessage());
+     throw e;
+    }
+    
+    finally {
+      if (reader != null) {
+        reader.close();
+      }
+    }
+    return null;    
+  }
+}