You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@chukwa.apache.org by as...@apache.org on 2009/08/28 18:11:58 UTC
svn commit: r808938 - in /hadoop/chukwa/trunk: ./ contrib/chukwa-pig/
src/java/org/apache/hadoop/chukwa/util/
src/test/org/apache/hadoop/chukwa/datacollection/
src/test/org/apache/hadoop/chukwa/datacollection/collector/
src/test/org/apache/hadoop/chukw...
Author: asrabkin
Date: Fri Aug 28 16:11:57 2009
New Revision: 808938
URL: http://svn.apache.org/viewvc?rev=808938&view=rev
Log:
CHUKWA-368. New data integrity validation tool
Added:
hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/ConstRateValidator.java
hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/TempFileUtil.java
hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/util/TestCRValidator.java
Modified:
hadoop/chukwa/trunk/CHANGES.txt
hadoop/chukwa/trunk/contrib/chukwa-pig/chukwa-pig.jar
hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/ConstRateAdaptor.java
hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/DumpChunks.java
hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/TempFileUtil.java
hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/collector/TestBackpressure.java
hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/connector/TestFailedCollector.java
hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/sender/TestRetryListOfCollectors.java
Modified: hadoop/chukwa/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/CHANGES.txt?rev=808938&r1=808937&r2=808938&view=diff
==============================================================================
--- hadoop/chukwa/trunk/CHANGES.txt (original)
+++ hadoop/chukwa/trunk/CHANGES.txt Fri Aug 28 16:11:57 2009
@@ -4,6 +4,8 @@
NEW FEATURES
+ CHUKWA-368. New data integrity validation tool. (asrabkin)
+
CHUKWA-383. Added embed mode for HICC. (Eric Yang)
CHUKWA-382. Added export button to export HICC graph as static image. (Eric Yang)
Modified: hadoop/chukwa/trunk/contrib/chukwa-pig/chukwa-pig.jar
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/contrib/chukwa-pig/chukwa-pig.jar?rev=808938&r1=808937&r2=808938&view=diff
==============================================================================
Binary files - no diff available.
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/ConstRateAdaptor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/ConstRateAdaptor.java?rev=808938&r1=808937&r2=808938&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/ConstRateAdaptor.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/ConstRateAdaptor.java Fri Aug 28 16:11:57 2009
@@ -20,6 +20,7 @@
import java.util.Random;
+import java.util.regex.*;
import org.apache.hadoop.chukwa.*;
import org.apache.hadoop.chukwa.datacollection.*;
import org.apache.hadoop.chukwa.datacollection.adaptor.*;
@@ -27,6 +28,18 @@
import org.apache.hadoop.chukwa.datacollection.agent.AdaptorManager;
import org.apache.hadoop.conf.Configuration;
+/**
+ * Emits chunks at a roughly constant data rate. Chunks are in a very particular
+ * format: the output data is verifiable, but sufficiently non-deterministic
+ * that two different instances of this adaptor are very likely to have
+ * distinct outputs.
+ *
+ *
+ * Each chunk is full of random bytes; the randomness comes from
+ * an instance of java.util.Random seeded with the offset xored
+ * with the time-of-generation. The time of generation is stored, big-endian,
+ * in the first eight bytes of each chunk.
+ */
public class ConstRateAdaptor extends Thread implements Adaptor {
private int SLEEP_VARIANCE = 200;
@@ -36,18 +49,17 @@
private long offset;
private int bytesPerSec;
private ChunkReceiver dest;
- private String adaptorID;
-
+ long seed;
+
private volatile boolean stopping = false;
public String getCurrentStatus() {
- return type.trim() + " " + bytesPerSec;
+ return type.trim() + " " + bytesPerSec + " " + seed;
}
public void start(String adaptorID, String type,
long offset, ChunkReceiver dest, AdaptorManager c) throws AdaptorException {
- this.adaptorID = adaptorID;
this.offset = offset;
this.type = type;
this.dest = dest;
@@ -60,7 +72,15 @@
public String parseArgs(String bytesPerSecParam) {
try {
- bytesPerSec = Integer.parseInt(bytesPerSecParam.trim());
+ Matcher m = Pattern.compile("([0-9]+)(?:\\s+([0-9]+))?\\s*").matcher(bytesPerSecParam);
+ if(!m.matches())
+ return null;
+ bytesPerSec = Integer.parseInt(m.group(1));
+ String rate = m.group(2);
+ if(rate != null)
+ seed = Long.parseLong(m.group(2));
+ else
+ seed = System.currentTimeMillis();
} catch (NumberFormatException e) {
//("bad argument to const rate adaptor: [" + bytesPerSecParam + "]");
return null;
@@ -76,12 +96,7 @@
// 3 secs
// FIXME: I think there's still a risk of integer overflow here
int arraySize = (int) (MSToSleep * (long) bytesPerSec / 1000L);
- byte[] data = new byte[arraySize];
- Random dataPattern = new Random(offset);
- offset += data.length;
- dataPattern.nextBytes(data);
- ChunkImpl evt = new ChunkImpl(type, "random data source", offset, data,
- this);
+ ChunkImpl evt = nextChunk(arraySize + 8);
dest.add(evt);
@@ -91,6 +106,21 @@
} // abort silently
}
+ public ChunkImpl nextChunk(int arraySize) {
+ byte[] data = new byte[arraySize];
+ Random dataPattern = new Random(offset ^ seed);
+ long s = this.seed;
+ offset += data.length;
+ dataPattern.nextBytes(data);
+ for(int i=0; i < 8; ++i) {
+ data[7-i] = (byte) (s & 0xFF);
+ s >>= 8;
+ }
+ ChunkImpl evt = new ChunkImpl(type, "random ("+ this.seed+")", offset, data,
+ this);
+ return evt;
+ }
+
public String toString() {
return "const rate " + type;
}
@@ -127,12 +157,23 @@
public static boolean checkChunk(Chunk chunk) {
byte[] data = chunk.getData();
byte[] correctData = new byte[data.length];
- Random dataPattern = new Random(chunk.getSeqID());
+
+ long seed = 0;
+ for(int i=0; i < 8; ++i)
+ seed = (seed << 8) | (0xFF & data[i] );
+
+ seed ^= (chunk.getSeqID() - data.length);
+ Random dataPattern = new Random(seed);
dataPattern.nextBytes(correctData);
- for(int i=0; i < data.length ; ++i)
+ for(int i=8; i < data.length ; ++i)
if(data [i] != correctData[i])
return false;
return true;
}
+
+ void test_init(String type) {
+ this.type = type;
+ seed = System.currentTimeMillis();
+ }
}
Added: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/ConstRateValidator.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/ConstRateValidator.java?rev=808938&view=auto
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/ConstRateValidator.java (added)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/ConstRateValidator.java Fri Aug 28 16:11:57 2009
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.chukwa.util;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.*;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.util.*;
+import org.apache.hadoop.chukwa.*;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+
+public class ConstRateValidator extends Configured implements Tool{
+
+ public static class ByteRange implements WritableComparable<ByteRange> {
+
+ String stream;
+ public long start;
+ public long len;
+
+ public ByteRange() {
+ start=len=0;
+ }
+
+ public ByteRange(ChunkImpl val) {
+
+ len = val.getLength();
+ start = val.getSeqID() - len;
+ this.stream = val.getSource()+":"+val.getStreamName() ;
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ stream = in.readUTF();
+ start = in.readLong();
+ len = in.readLong();
+ }
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeUTF(stream);
+ out.writeLong(start);
+ out.writeLong(len);
+ }
+
+
+ public static ByteRange read(DataInput in) throws IOException {
+ ByteRange b = new ByteRange();
+ b.readFields(in);
+ return b;
+ }
+
+ @Override
+ public int compareTo(ByteRange o) {
+ int c = stream.compareTo(o.stream);
+ if(c != 0)
+ return c;
+
+ if(start > o.start)
+ return 1;
+ else if (start < o.start)
+ return -1;
+ else {
+ if(len > o.len)
+ return 1;
+ else if(len < o.len)
+ return -1;
+ else
+ return 0;
+ }
+ }
+
+ public boolean equals(Object o) {
+ if(o instanceof ByteRange) {
+ ByteRange rhs = (ByteRange) o;
+ return stream.equals(rhs.stream) && rhs.start == start && rhs.len == len;
+ } else
+ return false;
+ }
+
+ public int hashCode() {
+ return (int) (
+ stream.hashCode() ^ (len>>32) ^ (len & 0xFFFFFFFF) ^ (start >> 32)
+ ^ (start & 0xFFFFFFFF));
+ }
+
+ }
+
+ public static class ValidatorSM {
+ public long ok=0, missingBytes=0,dupBytes=0;
+ long consecDupchunks=0;
+ long nextExpectedStart = 0;
+ public long chunks;
+ public long dupChunks;
+
+ public String closeSM() {
+ if(consecDupchunks > 0)
+ return consecDupchunks + " consecutive duplicate chunks ending at " + consecDupchunks;
+ else
+ return null;
+ }
+
+ public String advanceSM(ByteRange b) {
+ chunks++;
+
+ if(b.start == nextExpectedStart) {
+ String msg = null;
+ if(consecDupchunks > 0)
+ msg = consecDupchunks + " consecutive duplicative chunks ending at " + b.start;
+ consecDupchunks = 0;
+ nextExpectedStart += b.len;
+ ok += b.len;
+ return msg;
+ } else{
+// Text msg = new Text(b.stream + " " + consecOKchunks +
+// "consecutive OK chunks ending at " + nextExpectedStart);
+ String msg;
+ if(b.start < nextExpectedStart) { //duplicate bytes
+ consecDupchunks ++;
+ dupChunks++;
+ long duplicatedBytes;
+ if(b.start + b.len <= nextExpectedStart) {
+ duplicatedBytes = b.len;
+ msg =" dupchunk of length " + b.len + " at " + b.start;
+ } else {
+ duplicatedBytes = b.start + b.len - nextExpectedStart;
+ msg = " overlap of " + duplicatedBytes+ " starting at " + b.start +
+ " (total chunk len ="+b.len+")";
+ }
+ dupBytes += duplicatedBytes;
+ nextExpectedStart = Math.max(b.start + b.len, nextExpectedStart);
+ } else { //b.start > nextExpectedStart ==> missing bytes
+ consecDupchunks = 0;
+ long missing = (b.start - nextExpectedStart);
+ msg = "==Missing "+ missing+ " bytes starting from " + nextExpectedStart;
+ nextExpectedStart = b.start + b.len;
+
+ if(b.start < 0 || b.len < 0)
+ System.out.println("either len or start was negative; something is seriously wrong");
+
+ missingBytes += missing;
+ }
+ return msg;
+ } //end not-OK
+ } //end advance
+ } //end class
+
+ public static class MapClass extends Mapper <ChukwaArchiveKey, ChunkImpl, ByteRange, NullWritable> {
+
+ @Override
+ protected void map(ChukwaArchiveKey key, ChunkImpl val,
+ Mapper<ChukwaArchiveKey, ChunkImpl,ByteRange, NullWritable>.Context context)
+ throws IOException, InterruptedException
+ {
+ boolean valid = ConstRateAdaptor.checkChunk(val);
+ String fname = "unknown";
+
+ InputSplit inSplit = context.getInputSplit();
+ if(inSplit instanceof FileSplit) {
+ FileSplit fs = (FileSplit) inSplit;
+ fname = fs.getPath().getName();
+ }
+
+ if(!valid) {
+ context.getCounter("app", "badchunks").increment(1);
+ }
+ context.write(new ByteRange(val), NullWritable.get());
+ }
+ }
+
+ public static class ReduceClass extends Reducer<ByteRange, NullWritable, Text,Text> {
+
+ ValidatorSM sm;
+ String curStream = "";
+
+ public ReduceClass() {
+ sm = new ValidatorSM();
+ }
+
+// @Override
+// protected void setup(Reducer<ByteRange, NullWritable, Text,Text>.Context context) { }
+
+ @Override
+ protected void reduce(ByteRange b, Iterable<NullWritable> vals,
+ Reducer<ByteRange, NullWritable, Text,Text>.Context context) {
+ try {
+
+ if(!curStream.equals(b.stream)) {
+ if(!curStream.equals("")) {
+ Text cs = new Text(curStream);
+
+ String t = sm.closeSM();
+ if(t != null)
+ context.write(cs, new Text(t));
+
+ context.write(cs, new Text("total of " + sm.chunks + " chunks ("
+ + sm.dupChunks + " dups). " +" High byte =" + (sm.nextExpectedStart-1)));
+
+ context.getCounter("app", "missing bytes").increment(sm.missingBytes);
+ context.getCounter("app", "duplicate bytes").increment(sm.dupBytes);
+ context.getCounter("app", "OK Bytes").increment(sm.ok);
+ }
+
+ System.out.println("rolling over to new stream " + b.stream);
+ curStream = b.stream;
+ sm = new ValidatorSM();
+ }
+
+ String msg = sm.advanceSM(b);
+ if(msg != null)
+ context.write(new Text(b.stream), new Text(msg));
+
+ } catch(InterruptedException e) {
+ } catch(IOException e) {
+ e.printStackTrace();
+ }
+ }
+ } //end reduce class
+
+
+ public static void main(String[] args) throws Exception {
+ // System.out.println("specify -D textOutput=true for text output");
+ int res = ToolRunner.run(new Configuration(),
+ new ConstRateValidator(), args);
+ System.exit(res);
+ }
+
+ @Override
+ public int run(String[] real_args) throws Exception {
+ GenericOptionsParser gop = new GenericOptionsParser(getConf(), real_args);
+ Configuration conf = gop.getConfiguration();
+ String[] args = gop.getRemainingArgs();
+
+ Job validate = new Job(conf);
+
+ validate.setJobName("Chukwa Test pattern validator");
+ validate.setJarByClass(this.getClass());
+
+ validate.setInputFormatClass(SequenceFileInputFormat.class);
+
+ validate.setMapperClass(MapClass.class);
+ validate.setMapOutputKeyClass(ByteRange.class);
+ validate.setMapOutputValueClass(NullWritable.class);
+
+ validate.setReducerClass(ReduceClass.class);
+ validate.setOutputFormatClass(TextOutputFormat.class);
+
+
+ FileInputFormat.setInputPaths(validate, new Path(args[0]));
+ FileOutputFormat.setOutputPath(validate, new Path(args[1]));
+
+ validate.submit();
+ return 0;
+ }
+
+}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/DumpChunks.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/DumpChunks.java?rev=808938&r1=808937&r2=808938&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/DumpChunks.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/DumpChunks.java Fri Aug 28 16:11:57 2009
@@ -47,7 +47,7 @@
public static void main(String[] args) throws IOException, URISyntaxException {
if(args.length < 2) {
- System.out.println("usage: Dump pattern1,pattern2,pattern3... file1 file2 file3...");
+ System.out.println("usage: Dump [-s] pattern1,pattern2,pattern3... file1 file2 file3...");
System.exit(-1);
}
Added: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/TempFileUtil.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/TempFileUtil.java?rev=808938&view=auto
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/TempFileUtil.java (added)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/TempFileUtil.java Fri Aug 28 16:11:57 2009
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.util;
+
+
+import java.io.*;
+import java.util.Calendar;
+import java.util.Random;
+import org.apache.hadoop.chukwa.ChukwaArchiveKey;
+import org.apache.hadoop.chukwa.ChunkImpl;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+
+public class TempFileUtil {
+ public static File makeBinary(int length) throws IOException {
+ File tmpOutput = new File(System.getProperty("test.build.data", "/tmp"),
+ "chukwaTest");
+ FileOutputStream fos = new FileOutputStream(tmpOutput);
+ Random r = new Random();
+ byte[] randomData = new byte[length];
+ r.nextBytes(randomData);
+ randomData[length - 1] = '\n';// need data to end with \n since default
+ // tailer uses that
+ fos.write(randomData);
+ fos.flush();
+ fos.close();
+ return tmpOutput;
+ }
+
+
+ static class RandSeqFileWriter {
+ java.util.Random r = new java.util.Random();
+ long lastSeqID = 0;
+ public ChunkImpl getARandomChunk() {
+ int ms = r.nextInt(1000);
+ String line = "2008-05-29 10:42:22," + ms
+ + " INFO org.apache.hadoop.dfs.DataNode: Some text goes here"
+ + r.nextInt() + "\n";
+
+ ChunkImpl c = new ChunkImpl("HadoopLogProcessor", "test",
+ line.length() + lastSeqID, line.getBytes(), null);
+ lastSeqID += line.length();
+ c.addTag("cluster=\"foocluster\"");
+ return c;
+ }
+
+ }
+ public static void writeASinkFile(Configuration conf, FileSystem fileSys, Path dest,
+ int chunks) throws IOException {
+ FSDataOutputStream out = fileSys.create(dest);
+
+ Calendar calendar = Calendar.getInstance();
+ SequenceFile.Writer seqFileWriter = SequenceFile.createWriter(conf, out,
+ ChukwaArchiveKey.class, ChunkImpl.class,
+ SequenceFile.CompressionType.NONE, null);
+ RandSeqFileWriter rw = new RandSeqFileWriter();
+ for (int i = 0; i < chunks; ++i) {
+ ChunkImpl chunk = rw.getARandomChunk();
+ ChukwaArchiveKey archiveKey = new ChukwaArchiveKey();
+
+ calendar.set(Calendar.YEAR, 2008);
+ calendar.set(Calendar.MONTH, Calendar.MAY);
+ calendar.set(Calendar.DAY_OF_MONTH, 29);
+ calendar.set(Calendar.HOUR, 10);
+ calendar.set(Calendar.MINUTE, 0);
+ calendar.set(Calendar.SECOND, 0);
+ calendar.set(Calendar.MILLISECOND, 0);
+ archiveKey.setTimePartition(calendar.getTimeInMillis());
+ archiveKey.setDataType(chunk.getDataType());
+ archiveKey.setStreamName(chunk.getStreamName());
+ archiveKey.setSeqId(chunk.getSeqID());
+ seqFileWriter.append(archiveKey, chunk);
+ }
+ seqFileWriter.close();
+ out.close();
+ }
+
+
+}
Modified: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/TempFileUtil.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/TempFileUtil.java?rev=808938&r1=808937&r2=808938&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/TempFileUtil.java (original)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/TempFileUtil.java Fri Aug 28 16:11:57 2009
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.chukwa.datacollection;
-
-
-import java.io.*;
-import java.util.Random;
-
-public class TempFileUtil {
- public static File makeBinary(int length) throws IOException {
- File tmpOutput = new File(System.getProperty("test.build.data", "/tmp"),
- "chukwaTest");
- FileOutputStream fos = new FileOutputStream(tmpOutput);
- Random r = new Random();
- byte[] randomData = new byte[length];
- r.nextBytes(randomData);
- randomData[length - 1] = '\n';// need data to end with \n since default
- // tailer uses that
- fos.write(randomData);
- fos.flush();
- fos.close();
- return tmpOutput;
- }
-}
Modified: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/collector/TestBackpressure.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/collector/TestBackpressure.java?rev=808938&r1=808937&r2=808938&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/collector/TestBackpressure.java (original)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/collector/TestBackpressure.java Fri Aug 28 16:11:57 2009
@@ -52,8 +52,9 @@
" testData "+ SEND_RATE + " 0");
Thread.sleep(TEST_DURATION_SECS * 1000);
- String stat = agent.getAdaptorList().get("constSend");
- long kbytesPerSec = Long.valueOf(stat.split(" ")[3]) / TEST_DURATION_SECS / 1000;
+ String[] stat = agent.getAdaptorList().get("constSend").split(" ");
+ long kbytesPerSec = Long.valueOf(stat[stat.length -1]) / TEST_DURATION_SECS / 1000;
+
System.out.println("data rate was " + kbytesPerSec + " kb /second");
assertTrue(kbytesPerSec < WRITE_RATE_KB); //write rate should throttle sends
assertTrue(kbytesPerSec > MIN_ACCEPTABLE_PERCENT* WRITE_RATE_KB / 100);//an assumption, but should hold true
Modified: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/connector/TestFailedCollector.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/connector/TestFailedCollector.java?rev=808938&r1=808937&r2=808938&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/connector/TestFailedCollector.java (original)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/connector/TestFailedCollector.java Fri Aug 28 16:11:57 2009
@@ -21,10 +21,10 @@
import java.io.File;
import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
-import org.apache.hadoop.chukwa.datacollection.TempFileUtil;
import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
import org.apache.hadoop.chukwa.datacollection.connector.http.HttpConnector;
import org.apache.hadoop.chukwa.datacollection.controller.ChukwaAgentController;
+import org.apache.hadoop.chukwa.util.TempFileUtil;
import junit.framework.TestCase;
public class TestFailedCollector extends TestCase {
Modified: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/sender/TestRetryListOfCollectors.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/sender/TestRetryListOfCollectors.java?rev=808938&r1=808937&r2=808938&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/sender/TestRetryListOfCollectors.java (original)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/sender/TestRetryListOfCollectors.java Fri Aug 28 16:11:57 2009
@@ -34,6 +34,7 @@
hosts.add("host4");
Configuration conf = new Configuration();
RetryListOfCollectors rloc = new RetryListOfCollectors(hosts, conf);
+ rloc.shuffleList();
assertEquals(hosts.size(), rloc.total());
for (int i = 0; i < hosts.size(); ++i) {
Added: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/util/TestCRValidator.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/util/TestCRValidator.java?rev=808938&view=auto
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/util/TestCRValidator.java (added)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/util/TestCRValidator.java Fri Aug 28 16:11:57 2009
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.chukwa.util;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.hadoop.chukwa.*;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import junit.framework.TestCase;
+import static org.apache.hadoop.chukwa.util.ConstRateValidator.ByteRange;
+import static org.apache.hadoop.chukwa.util.ConstRateValidator.ValidatorSM;
+import static org.apache.hadoop.chukwa.util.TempFileUtil.writeASinkFile;
+
+public class TestCRValidator extends TestCase{
+
+ public void testCRchunks() {
+ ConstRateAdaptor adaptor = new ConstRateAdaptor();
+ adaptor.parseArgs("500 200 ");
+ adaptor.test_init("testdata");
+ Chunk c = adaptor.nextChunk(100);
+ assertTrue(ConstRateAdaptor.checkChunk(c));
+ c = adaptor.nextChunk(102);
+ assertTrue(ConstRateAdaptor.checkChunk(c));
+ }
+
+ public void testBasicSM() throws Exception {
+ ValidatorSM sm = new ValidatorSM();
+ byte[] dat = "test".getBytes();
+ ChunkImpl c = new ChunkImpl("Data", "aname", dat.length, dat, null);
+ ByteRange b = new ByteRange(c);
+ assertEquals(4, b.len);
+ assertEquals(0, b.start);
+ String t = sm.advanceSM(b);
+ assertNull(t);
+ if(t != null)
+ System.out.println(t);
+
+ dat = "ing".getBytes();
+ c = new ChunkImpl("Data", "aname", dat.length+4, dat, null);
+ b = new ByteRange(c);
+ assertEquals(4, b.start);
+ t = sm.advanceSM(b);
+ assertNull(t);
+ if(t != null)
+ System.out.println(t);
+
+ b = new ByteRange(new ChunkImpl("Data", "aname", 12, "more".getBytes(), null));
+ t= sm.advanceSM(b);
+ System.out.println(t);
+ }
+
+ public void testSlurping() throws Exception {
+ int NUM_CHUNKS = 10;
+ Configuration conf = new Configuration();
+ FileSystem localfs = FileSystem.getLocal(conf);
+ String baseDir = System.getProperty("test.build.data", "/tmp");
+ Path tmpFile = new Path(baseDir+"/tmpSeqFile.seq");
+ writeASinkFile(conf, localfs, tmpFile, NUM_CHUNKS);
+
+ ValidatorSM sm = new ValidatorSM();
+
+ try {
+ SequenceFile.Reader reader = new SequenceFile.Reader(localfs, tmpFile, conf);
+
+ ChukwaArchiveKey key = new ChukwaArchiveKey();
+ ChunkImpl chunk = ChunkImpl.getBlankChunk();
+
+ while (reader.next(key, chunk)) {
+ String s = sm.advanceSM(new ByteRange(chunk));
+ assertNull(s);
+ }
+ reader.close();
+ assertEquals(NUM_CHUNKS, sm.chunks);
+ localfs.delete(tmpFile);
+ } catch(IOException e) {
+ e.printStackTrace();
+ }
+
+ }
+
+}