You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@chukwa.apache.org by as...@apache.org on 2009/12/22 21:08:57 UTC

svn commit: r893300 - in /hadoop/chukwa/trunk: ./ src/java/org/apache/hadoop/chukwa/datacollection/test/ src/java/org/apache/hadoop/chukwa/datacollection/writer/ src/test/org/apache/hadoop/chukwa/datacollection/adaptor/

Author: asrabkin
Date: Tue Dec 22 20:08:55 2009
New Revision: 893300

URL: http://svn.apache.org/viewvc?rev=893300&view=rev
Log:
CHUKWA-433. File-per-post writer

Added:
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/test/FilePerPostWriter.java
Modified:
    hadoop/chukwa/trunk/CHANGES.txt
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/SeqFileWriter.java
    hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/TestUDPAdaptor.java

Modified: hadoop/chukwa/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/CHANGES.txt?rev=893300&r1=893299&r2=893300&view=diff
==============================================================================
--- hadoop/chukwa/trunk/CHANGES.txt (original)
+++ hadoop/chukwa/trunk/CHANGES.txt Tue Dec 22 20:08:55 2009
@@ -4,6 +4,8 @@
 
   NEW FEATURES
 
+    CHUKWA-433. File-per-post writer for benchmark purposes. (asrabkin)
+
     CHUKWA-431. UDP Adaptor. (asrabkin)
 
     CHUKWA-46. Ability to allow only local connections to agent control port. (asrabkin)

Added: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/test/FilePerPostWriter.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/test/FilePerPostWriter.java?rev=893300&view=auto
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/test/FilePerPostWriter.java (added)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/test/FilePerPostWriter.java Tue Dec 22 20:08:55 2009
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.chukwa.datacollection.test;
+
+import java.io.IOException;
+import java.net.URI;
+
+
+import java.util.List;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.hadoop.chukwa.ChukwaArchiveKey;
+import org.apache.hadoop.chukwa.Chunk;
+import org.apache.hadoop.chukwa.ChunkImpl;
+import org.apache.hadoop.chukwa.datacollection.writer.*;
+import org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter.CommitStatus;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.mortbay.log.Log;
+
+/**
+ * A writer that writes a file for each post. Intended ONLY for architectural
+ * performance comparisons.  Do not use this in production.
+ *
+ */
+public class FilePerPostWriter extends SeqFileWriter {
+
+  String baseName;
+  AtomicLong counter = new AtomicLong(0);
+  
+  @Override
+  public CommitStatus add(List<Chunk> chunks) throws WriterException {
+    try {
+      String newName = baseName +"_" +counter.incrementAndGet();
+      Path newOutputPath = new Path(newName + ".done");
+      FSDataOutputStream newOutputStr = fs.create(newOutputPath);
+      currentOutputStr = newOutputStr;
+      currentPath = newOutputPath;
+      currentFileName = newName;
+      // Uncompressed for now
+      seqFileWriter = SequenceFile.createWriter(conf, newOutputStr,
+          ChukwaArchiveKey.class, ChunkImpl.class,
+          SequenceFile.CompressionType.NONE, null);
+    
+      super.add(chunks);
+      seqFileWriter.close();
+    } catch(IOException e) {
+      throw new WriterException(e);
+    }
+    return COMMIT_OK;
+  }
+
+  @Override
+  public void close() {
+  }
+
+  @Override
+  public void init(Configuration conf) throws WriterException {
+    try {
+      this.conf = conf;
+      outputDir = conf.get(SeqFileWriter.OUTPUT_DIR_OPT, "/chukwa");
+      baseName = outputDir + "/"+System.currentTimeMillis()+ "_" + localHostAddr.hashCode();
+      
+      String fsname = conf.get("writer.hdfs.filesystem");
+      if (fsname == null || fsname.equals("")) {
+        // otherwise try to get the filesystem from hadoop
+        fsname = conf.get("fs.default.name");
+      }
+
+      fs = FileSystem.get(new URI(fsname), conf);
+      isRunning = true;
+    } catch(Exception e) {
+      throw new WriterException(e);
+    }
+      
+  }
+
+}

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/SeqFileWriter.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/SeqFileWriter.java?rev=893300&r1=893299&r2=893300&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/SeqFileWriter.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/SeqFileWriter.java Tue Dec 22 20:08:55 2009
@@ -54,20 +54,20 @@
   public static final String STAT_PERIOD_OPT = "chukwaCollector.stats.period";
   public static final String ROTATE_INTERVAL_OPT = "chukwaCollector.rotateInterval";
   public static final String OUTPUT_DIR_OPT= "chukwaCollector.outputDir";
-  static String localHostAddr = null;
+  protected static String localHostAddr = null;
   
-  final Semaphore lock = new Semaphore(1, true);
+  protected final Semaphore lock = new Semaphore(1, true);
   
-  private FileSystem fs = null;
-  private Configuration conf = null;
+  protected FileSystem fs = null;
+  protected Configuration conf = null;
 
-  private String outputDir = null;
+  protected String outputDir = null;
   private Calendar calendar = Calendar.getInstance();
 
-  private Path currentPath = null;
-  private String currentFileName = null;
+  protected Path currentPath = null;
+  protected String currentFileName = null;
   protected FSDataOutputStream currentOutputStr = null;
-  private SequenceFile.Writer seqFileWriter = null;
+  protected SequenceFile.Writer seqFileWriter = null;
 
   private long timePeriod = -1;
   private long nextTimePeriodComputation = -1;

Modified: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/TestUDPAdaptor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/TestUDPAdaptor.java?rev=893300&r1=893299&r2=893300&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/TestUDPAdaptor.java (original)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/TestUDPAdaptor.java Tue Dec 22 20:08:55 2009
@@ -19,7 +19,6 @@
 
 import junit.framework.TestCase;
 import org.apache.hadoop.chukwa.datacollection.ChunkReceiver;
-import org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.RCheckFTAdaptor;
 import org.apache.hadoop.chukwa.datacollection.agent.AdaptorManager;
 import org.apache.hadoop.chukwa.*;
 import java.net.*;