You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@chukwa.apache.org by as...@apache.org on 2009/12/22 21:08:57 UTC
svn commit: r893300 - in /hadoop/chukwa/trunk: ./
src/java/org/apache/hadoop/chukwa/datacollection/test/
src/java/org/apache/hadoop/chukwa/datacollection/writer/
src/test/org/apache/hadoop/chukwa/datacollection/adaptor/
Author: asrabkin
Date: Tue Dec 22 20:08:55 2009
New Revision: 893300
URL: http://svn.apache.org/viewvc?rev=893300&view=rev
Log:
CHUKWA-433. File-per-post writer
Added:
hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/test/FilePerPostWriter.java
Modified:
hadoop/chukwa/trunk/CHANGES.txt
hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/SeqFileWriter.java
hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/TestUDPAdaptor.java
Modified: hadoop/chukwa/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/CHANGES.txt?rev=893300&r1=893299&r2=893300&view=diff
==============================================================================
--- hadoop/chukwa/trunk/CHANGES.txt (original)
+++ hadoop/chukwa/trunk/CHANGES.txt Tue Dec 22 20:08:55 2009
@@ -4,6 +4,8 @@
NEW FEATURES
+ CHUKWA-433. File-per-post writer for benchmark purposes. (asrabkin)
+
CHUKWA-431. UDP Adaptor. (asrabkin)
CHUKWA-46. Ability to allow only local connections to agent control port. (asrabkin)
Added: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/test/FilePerPostWriter.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/test/FilePerPostWriter.java?rev=893300&view=auto
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/test/FilePerPostWriter.java (added)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/test/FilePerPostWriter.java Tue Dec 22 20:08:55 2009
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.chukwa.datacollection.test;
+
+import java.io.IOException;
+import java.net.URI;
+
+
+import java.util.List;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.hadoop.chukwa.ChukwaArchiveKey;
+import org.apache.hadoop.chukwa.Chunk;
+import org.apache.hadoop.chukwa.ChunkImpl;
+import org.apache.hadoop.chukwa.datacollection.writer.*;
+import org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter.CommitStatus;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.mortbay.log.Log;
+
+/**
+ * A writer that writes a file for each post. Intended ONLY for architectural
+ * performance comparisons. Do not use this in production.
+ *
+ */
+public class FilePerPostWriter extends SeqFileWriter {
+
+ String baseName;
+ AtomicLong counter = new AtomicLong(0);
+
+ @Override
+ public CommitStatus add(List<Chunk> chunks) throws WriterException {
+ try {
+ String newName = baseName +"_" +counter.incrementAndGet();
+ Path newOutputPath = new Path(newName + ".done");
+ FSDataOutputStream newOutputStr = fs.create(newOutputPath);
+ currentOutputStr = newOutputStr;
+ currentPath = newOutputPath;
+ currentFileName = newName;
+ // Uncompressed for now
+ seqFileWriter = SequenceFile.createWriter(conf, newOutputStr,
+ ChukwaArchiveKey.class, ChunkImpl.class,
+ SequenceFile.CompressionType.NONE, null);
+
+ super.add(chunks);
+ seqFileWriter.close();
+ } catch(IOException e) {
+ throw new WriterException(e);
+ }
+ return COMMIT_OK;
+ }
+
+ @Override
+ public void close() {
+ }
+
+ @Override
+ public void init(Configuration conf) throws WriterException {
+ try {
+ this.conf = conf;
+ outputDir = conf.get(SeqFileWriter.OUTPUT_DIR_OPT, "/chukwa");
+ baseName = outputDir + "/"+System.currentTimeMillis()+ "_" + localHostAddr.hashCode();
+
+ String fsname = conf.get("writer.hdfs.filesystem");
+ if (fsname == null || fsname.equals("")) {
+ // otherwise try to get the filesystem from hadoop
+ fsname = conf.get("fs.default.name");
+ }
+
+ fs = FileSystem.get(new URI(fsname), conf);
+ isRunning = true;
+ } catch(Exception e) {
+ throw new WriterException(e);
+ }
+
+ }
+
+}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/SeqFileWriter.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/SeqFileWriter.java?rev=893300&r1=893299&r2=893300&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/SeqFileWriter.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/SeqFileWriter.java Tue Dec 22 20:08:55 2009
@@ -54,20 +54,20 @@
public static final String STAT_PERIOD_OPT = "chukwaCollector.stats.period";
public static final String ROTATE_INTERVAL_OPT = "chukwaCollector.rotateInterval";
public static final String OUTPUT_DIR_OPT= "chukwaCollector.outputDir";
- static String localHostAddr = null;
+ protected static String localHostAddr = null;
- final Semaphore lock = new Semaphore(1, true);
+ protected final Semaphore lock = new Semaphore(1, true);
- private FileSystem fs = null;
- private Configuration conf = null;
+ protected FileSystem fs = null;
+ protected Configuration conf = null;
- private String outputDir = null;
+ protected String outputDir = null;
private Calendar calendar = Calendar.getInstance();
- private Path currentPath = null;
- private String currentFileName = null;
+ protected Path currentPath = null;
+ protected String currentFileName = null;
protected FSDataOutputStream currentOutputStr = null;
- private SequenceFile.Writer seqFileWriter = null;
+ protected SequenceFile.Writer seqFileWriter = null;
private long timePeriod = -1;
private long nextTimePeriodComputation = -1;
Modified: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/TestUDPAdaptor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/TestUDPAdaptor.java?rev=893300&r1=893299&r2=893300&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/TestUDPAdaptor.java (original)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/TestUDPAdaptor.java Tue Dec 22 20:08:55 2009
@@ -19,7 +19,6 @@
import junit.framework.TestCase;
import org.apache.hadoop.chukwa.datacollection.ChunkReceiver;
-import org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.RCheckFTAdaptor;
import org.apache.hadoop.chukwa.datacollection.agent.AdaptorManager;
import org.apache.hadoop.chukwa.*;
import java.net.*;