You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@chukwa.apache.org by as...@apache.org on 2009/08/28 18:11:58 UTC

svn commit: r808938 - in /hadoop/chukwa/trunk: ./ contrib/chukwa-pig/ src/java/org/apache/hadoop/chukwa/util/ src/test/org/apache/hadoop/chukwa/datacollection/ src/test/org/apache/hadoop/chukwa/datacollection/collector/ src/test/org/apache/hadoop/chukw...

Author: asrabkin
Date: Fri Aug 28 16:11:57 2009
New Revision: 808938

URL: http://svn.apache.org/viewvc?rev=808938&view=rev
Log:
CHUKWA-368. New data integrity validation tool

Added:
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/ConstRateValidator.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/TempFileUtil.java
    hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/util/TestCRValidator.java
Modified:
    hadoop/chukwa/trunk/CHANGES.txt
    hadoop/chukwa/trunk/contrib/chukwa-pig/chukwa-pig.jar
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/ConstRateAdaptor.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/DumpChunks.java
    hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/TempFileUtil.java
    hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/collector/TestBackpressure.java
    hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/connector/TestFailedCollector.java
    hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/sender/TestRetryListOfCollectors.java

Modified: hadoop/chukwa/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/CHANGES.txt?rev=808938&r1=808937&r2=808938&view=diff
==============================================================================
--- hadoop/chukwa/trunk/CHANGES.txt (original)
+++ hadoop/chukwa/trunk/CHANGES.txt Fri Aug 28 16:11:57 2009
@@ -4,6 +4,8 @@
 
   NEW FEATURES
 
+    CHUKWA-368. New data integrity validation tool. (asrabkin)
+
     CHUKWA-383. Added embed mode for HICC.  (Eric Yang)
 
     CHUKWA-382. Added export button to export HICC graph as static image.  (Eric Yang)

Modified: hadoop/chukwa/trunk/contrib/chukwa-pig/chukwa-pig.jar
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/contrib/chukwa-pig/chukwa-pig.jar?rev=808938&r1=808937&r2=808938&view=diff
==============================================================================
Binary files - no diff available.

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/ConstRateAdaptor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/ConstRateAdaptor.java?rev=808938&r1=808937&r2=808938&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/ConstRateAdaptor.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/ConstRateAdaptor.java Fri Aug 28 16:11:57 2009
@@ -20,6 +20,7 @@
 
 
 import java.util.Random;
+import java.util.regex.*;
 import org.apache.hadoop.chukwa.*;
 import org.apache.hadoop.chukwa.datacollection.*;
 import org.apache.hadoop.chukwa.datacollection.adaptor.*;
@@ -27,6 +28,18 @@
 import org.apache.hadoop.chukwa.datacollection.agent.AdaptorManager;
 import org.apache.hadoop.conf.Configuration;
 
+/**
+ * Emits chunks at a roughly constant data rate. Chunks are in a very particular
+ * format: the output data is verifiable, but sufficiently non-deterministic
+ * that two different instances of this adaptor are very likely to have
+ * distinct outputs.
+ * 
+ * 
+ * Each chunk is full of random bytes; the randomness comes from 
+ * an instance of java.util.Random seeded with the offset xored
+ * with the time-of-generation. The time of generation is stored, big-endian,
+ * in the first eight bytes of each chunk.
+ */
 public class ConstRateAdaptor extends Thread implements Adaptor {
 
   private int SLEEP_VARIANCE = 200;
@@ -36,18 +49,17 @@
   private long offset;
   private int bytesPerSec;
   private ChunkReceiver dest;
-  private String adaptorID;
-
+  long seed;
+  
   private volatile boolean stopping = false;
 
   public String getCurrentStatus() {
-    return type.trim() + " " + bytesPerSec;
+    return type.trim() + " " + bytesPerSec + " " + seed;
   }
 
   public void start(String adaptorID, String type, 
       long offset, ChunkReceiver dest, AdaptorManager c) throws AdaptorException {
 
-    this.adaptorID = adaptorID;
     this.offset = offset;
     this.type = type;
     this.dest = dest;
@@ -60,7 +72,15 @@
 
   public String parseArgs(String bytesPerSecParam) {
     try {
-      bytesPerSec = Integer.parseInt(bytesPerSecParam.trim());
+      Matcher m = Pattern.compile("([0-9]+)(?:\\s+([0-9]+))?\\s*").matcher(bytesPerSecParam);
+      if(!m.matches())
+        return null;
+      bytesPerSec = Integer.parseInt(m.group(1));
+      String rate = m.group(2);
+      if(rate != null)
+        seed = Long.parseLong(m.group(2));
+      else
+        seed = System.currentTimeMillis();
     } catch (NumberFormatException e) {
       //("bad argument to const rate adaptor: ["  + bytesPerSecParam + "]");
       return null;
@@ -76,12 +96,7 @@
                                                                // 3 secs
         // FIXME: I think there's still a risk of integer overflow here
         int arraySize = (int) (MSToSleep * (long) bytesPerSec / 1000L);
-        byte[] data = new byte[arraySize];
-        Random dataPattern = new Random(offset);
-        offset += data.length;
-        dataPattern.nextBytes(data);
-        ChunkImpl evt = new ChunkImpl(type, "random data source", offset, data,
-            this);
+        ChunkImpl evt = nextChunk(arraySize + 8);
 
         dest.add(evt);
 
@@ -91,6 +106,21 @@
     } // abort silently
   }
 
+  public ChunkImpl nextChunk(int arraySize) {
+    byte[] data = new byte[arraySize];
+    Random dataPattern = new Random(offset ^ seed);
+    long s = this.seed;
+    offset += data.length;
+    dataPattern.nextBytes(data);
+    for(int i=0; i < 8; ++i)  {
+      data[7-i] = (byte) (s & 0xFF);
+      s >>= 8;
+    }
+    ChunkImpl evt = new ChunkImpl(type, "random ("+ this.seed+")", offset, data,
+        this);
+    return evt;
+  }
+
   public String toString() {
     return "const rate " + type;
   }
@@ -127,12 +157,23 @@
   public static boolean checkChunk(Chunk chunk) {
     byte[] data = chunk.getData();
     byte[] correctData = new byte[data.length];
-    Random dataPattern = new Random(chunk.getSeqID());
+    
+    long seed = 0;
+    for(int i=0; i < 8; ++i) 
+      seed = (seed << 8) | (0xFF & data[i] );
+
+    seed ^= (chunk.getSeqID() - data.length);
+    Random dataPattern = new Random(seed);
     dataPattern.nextBytes(correctData);
-    for(int i=0; i < data.length ; ++i) 
+    for(int i=8; i < data.length ; ++i) 
       if(data [i] != correctData[i])
         return false;
      
     return true;
   }
+  
+  void test_init(String type) {
+    this.type = type;
+    seed = System.currentTimeMillis();
+  }
 }

Added: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/ConstRateValidator.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/ConstRateValidator.java?rev=808938&view=auto
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/ConstRateValidator.java (added)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/ConstRateValidator.java Fri Aug 28 16:11:57 2009
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.chukwa.util;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.*;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.util.*;
+import org.apache.hadoop.chukwa.*;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+
+public class ConstRateValidator extends Configured implements Tool{
+  
+  public static class ByteRange implements WritableComparable<ByteRange> {
+    
+    String stream;
+    public long start;
+    public long len;
+    
+    public ByteRange() {
+      start=len=0;
+    }
+    
+    public ByteRange(ChunkImpl val) {
+      
+      len = val.getLength();
+      start = val.getSeqID() - len;     
+      this.stream = val.getSource()+":"+val.getStreamName() ;
+    }
+    
+    @Override
+    public void readFields(DataInput in) throws IOException {
+      stream = in.readUTF();
+      start = in.readLong();
+      len = in.readLong();
+    }
+    @Override
+    public void write(DataOutput out) throws IOException {
+      out.writeUTF(stream);
+      out.writeLong(start);
+      out.writeLong(len);
+    }
+    
+
+    public static ByteRange read(DataInput in) throws IOException {
+      ByteRange b = new ByteRange();
+      b.readFields(in);
+      return b;
+    }
+    
+    @Override
+    public int compareTo(ByteRange o) {
+      int c = stream.compareTo(o.stream);
+      if(c != 0)
+        return c;
+      
+      if(start > o.start)
+        return 1;
+      else if (start < o.start)
+        return -1;
+      else {
+        if(len > o.len)
+          return 1;
+        else if(len < o.len)
+          return -1;
+        else
+          return 0;
+      }
+    }
+    
+    public boolean equals(Object o) {
+      if(o instanceof ByteRange) {
+        ByteRange rhs = (ByteRange) o;
+        return stream.equals(rhs.stream) && rhs.start == start && rhs.len == len;
+      } else
+        return false;
+    }
+    
+    public int hashCode() {
+      return (int) (
+          stream.hashCode() ^ (len>>32) ^ (len & 0xFFFFFFFF) ^ (start >> 32)
+          ^ (start & 0xFFFFFFFF));
+    }
+    
+  }
+  
+  public static class ValidatorSM {
+    public long ok=0, missingBytes=0,dupBytes=0;
+    long consecDupchunks=0;
+    long nextExpectedStart = 0;
+    public long chunks;
+    public long dupChunks;
+
+    public String closeSM() {
+      if(consecDupchunks > 0)
+        return consecDupchunks + " consecutive duplicate chunks ending at " + consecDupchunks;
+      else
+        return null;
+    }
+    
+    public String advanceSM(ByteRange b) {
+      chunks++;
+      
+      if(b.start == nextExpectedStart) {
+        String msg = null;
+        if(consecDupchunks > 0)
+          msg = consecDupchunks + " consecutive duplicative chunks ending at " + b.start;
+        consecDupchunks = 0;
+        nextExpectedStart += b.len;
+        ok += b.len;
+        return msg;
+      } else{
+//        Text msg = new Text(b.stream + " " + consecOKchunks + 
+//            "consecutive OK chunks ending at " + nextExpectedStart);
+        String msg;
+        if(b.start < nextExpectedStart) {    //duplicate bytes
+          consecDupchunks ++;
+          dupChunks++;
+          long duplicatedBytes;
+          if(b.start + b.len <= nextExpectedStart) {
+            duplicatedBytes = b.len;
+            msg =" dupchunk of length " + b.len + " at " + b.start;
+          } else {
+            duplicatedBytes = b.start + b.len - nextExpectedStart;
+            msg = "  overlap of " + duplicatedBytes+ " starting at " + b.start +
+            " (total chunk len ="+b.len+")";
+          }
+          dupBytes += duplicatedBytes;
+          nextExpectedStart = Math.max(b.start + b.len, nextExpectedStart);
+        } else {  //b.start > nextExpectedStart  ==>  missing bytes
+          consecDupchunks = 0;
+          long missing = (b.start - nextExpectedStart);
+          msg = "==Missing "+ missing+ " bytes starting from " + nextExpectedStart;
+          nextExpectedStart = b.start + b.len;
+          
+          if(b.start < 0 || b.len < 0)
+            System.out.println("either len or start was negative; something is seriously wrong");
+          
+          missingBytes += missing;
+        }
+        return msg;
+      } //end not-OK  
+    } //end advance
+  } //end class
+  
+  public static class MapClass extends Mapper <ChukwaArchiveKey, ChunkImpl, ByteRange, NullWritable> {
+    
+    @Override
+    protected void map(ChukwaArchiveKey key, ChunkImpl val, 
+        Mapper<ChukwaArchiveKey, ChunkImpl,ByteRange, NullWritable>.Context context)
+        throws IOException, InterruptedException 
+    {
+      boolean valid = ConstRateAdaptor.checkChunk(val);
+      String fname = "unknown";
+      
+      InputSplit inSplit = context.getInputSplit();
+      if(inSplit instanceof FileSplit) {
+        FileSplit fs = (FileSplit) inSplit;
+        fname = fs.getPath().getName();
+      }
+      
+      if(!valid) {
+        context.getCounter("app", "badchunks").increment(1);
+      }
+      context.write(new ByteRange(val), NullWritable.get());
+    }
+  }
+    
+  public static class ReduceClass extends Reducer<ByteRange, NullWritable, Text,Text> {
+    
+    ValidatorSM sm;
+    String curStream = "";
+    
+    public ReduceClass() {
+      sm = new ValidatorSM();
+    }
+    
+//    @Override
+//    protected void setup(Reducer<ByteRange, NullWritable, Text,Text>.Context context) {       }
+    
+    @Override
+    protected void reduce(ByteRange b, Iterable<NullWritable> vals, 
+        Reducer<ByteRange, NullWritable, Text,Text>.Context context) {
+      try {
+
+      if(!curStream.equals(b.stream)) {
+        if(!curStream.equals("")) {
+          Text cs = new Text(curStream);
+
+          String t = sm.closeSM();
+          if(t != null)
+            context.write(cs, new Text(t));
+
+          context.write(cs, new Text("total of " + sm.chunks + " chunks ("
+             + sm.dupChunks + " dups). " +" High byte =" + (sm.nextExpectedStart-1)));
+          
+          context.getCounter("app", "missing bytes").increment(sm.missingBytes);
+          context.getCounter("app", "duplicate bytes").increment(sm.dupBytes);
+          context.getCounter("app", "OK Bytes").increment(sm.ok);
+        }
+        
+        System.out.println("rolling over to new stream " + b.stream);
+        curStream = b.stream;
+        sm = new ValidatorSM();
+      }
+      
+      String msg = sm.advanceSM(b);
+      if(msg != null)
+        context.write(new Text(b.stream), new Text(msg));
+
+    } catch(InterruptedException e) {
+    } catch(IOException e) {
+      e.printStackTrace();
+    }
+  }
+  } //end reduce class
+
+
+  public static void main(String[] args) throws Exception {
+ //   System.out.println("specify -D textOutput=true for text output");
+    int res = ToolRunner.run(new Configuration(),
+        new ConstRateValidator(), args);
+    System.exit(res);
+  }
+
+  @Override
+  public int run(String[] real_args) throws Exception {
+    GenericOptionsParser gop = new GenericOptionsParser(getConf(), real_args);
+    Configuration conf = gop.getConfiguration();
+    String[] args = gop.getRemainingArgs();
+
+    Job validate = new Job(conf);
+    
+    validate.setJobName("Chukwa Test pattern validator");
+    validate.setJarByClass(this.getClass());
+    
+    validate.setInputFormatClass(SequenceFileInputFormat.class);
+    
+    validate.setMapperClass(MapClass.class);
+    validate.setMapOutputKeyClass(ByteRange.class);
+    validate.setMapOutputValueClass(NullWritable.class);
+
+    validate.setReducerClass(ReduceClass.class);
+    validate.setOutputFormatClass(TextOutputFormat.class);
+
+    
+    FileInputFormat.setInputPaths(validate, new Path(args[0]));
+    FileOutputFormat.setOutputPath(validate, new Path(args[1]));
+
+    validate.submit();
+    return 0;
+  }
+
+}

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/DumpChunks.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/DumpChunks.java?rev=808938&r1=808937&r2=808938&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/DumpChunks.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/DumpChunks.java Fri Aug 28 16:11:57 2009
@@ -47,7 +47,7 @@
   public static void main(String[] args) throws IOException, URISyntaxException {
     
     if(args.length < 2) {
-      System.out.println("usage: Dump pattern1,pattern2,pattern3... file1 file2 file3...");
+      System.out.println("usage: Dump [-s] pattern1,pattern2,pattern3... file1 file2 file3...");
       System.exit(-1);
     }
     

Added: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/TempFileUtil.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/TempFileUtil.java?rev=808938&view=auto
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/TempFileUtil.java (added)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/TempFileUtil.java Fri Aug 28 16:11:57 2009
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.util;
+
+
+import java.io.*;
+import java.util.Calendar;
+import java.util.Random;
+import org.apache.hadoop.chukwa.ChukwaArchiveKey;
+import org.apache.hadoop.chukwa.ChunkImpl;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+
+public class TempFileUtil {
+  public static File makeBinary(int length) throws IOException {
+    File tmpOutput = new File(System.getProperty("test.build.data", "/tmp"),
+        "chukwaTest");
+    FileOutputStream fos = new FileOutputStream(tmpOutput);
+    Random r = new Random();
+    byte[] randomData = new byte[length];
+    r.nextBytes(randomData);
+    randomData[length - 1] = '\n';// need data to end with \n since default
+                                  // tailer uses that
+    fos.write(randomData);
+    fos.flush();
+    fos.close();
+    return tmpOutput;
+  }
+  
+
+  static class RandSeqFileWriter {
+    java.util.Random r = new java.util.Random();
+    long lastSeqID = 0;
+     public ChunkImpl getARandomChunk() {
+       int ms = r.nextInt(1000);
+       String line = "2008-05-29 10:42:22," + ms
+           + " INFO org.apache.hadoop.dfs.DataNode: Some text goes here"
+           + r.nextInt() + "\n";
+   
+       ChunkImpl c = new ChunkImpl("HadoopLogProcessor", "test",
+           line.length()  + lastSeqID, line.getBytes(), null);
+       lastSeqID += line.length();
+       c.addTag("cluster=\"foocluster\"");
+       return c;
+     }
+
+  }
+   public static void writeASinkFile(Configuration conf, FileSystem fileSys, Path dest,
+       int chunks) throws IOException {
+     FSDataOutputStream out = fileSys.create(dest);
+
+     Calendar calendar = Calendar.getInstance();
+     SequenceFile.Writer seqFileWriter = SequenceFile.createWriter(conf, out,
+         ChukwaArchiveKey.class, ChunkImpl.class,
+         SequenceFile.CompressionType.NONE, null);
+     RandSeqFileWriter rw = new RandSeqFileWriter();
+     for (int i = 0; i < chunks; ++i) {
+       ChunkImpl chunk = rw.getARandomChunk();
+       ChukwaArchiveKey archiveKey = new ChukwaArchiveKey();
+
+       calendar.set(Calendar.YEAR, 2008);
+       calendar.set(Calendar.MONTH, Calendar.MAY);
+       calendar.set(Calendar.DAY_OF_MONTH, 29);
+       calendar.set(Calendar.HOUR, 10);
+       calendar.set(Calendar.MINUTE, 0);
+       calendar.set(Calendar.SECOND, 0);
+       calendar.set(Calendar.MILLISECOND, 0);
+       archiveKey.setTimePartition(calendar.getTimeInMillis());
+       archiveKey.setDataType(chunk.getDataType());
+       archiveKey.setStreamName(chunk.getStreamName());
+       archiveKey.setSeqId(chunk.getSeqID());
+       seqFileWriter.append(archiveKey, chunk);
+     }
+     seqFileWriter.close();
+     out.close();
+   }
+   
+  
+}

Modified: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/TempFileUtil.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/TempFileUtil.java?rev=808938&r1=808937&r2=808938&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/TempFileUtil.java (original)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/TempFileUtil.java Fri Aug 28 16:11:57 2009
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.chukwa.datacollection;
-
-
-import java.io.*;
-import java.util.Random;
-
-public class TempFileUtil {
-  public static File makeBinary(int length) throws IOException {
-    File tmpOutput = new File(System.getProperty("test.build.data", "/tmp"),
-        "chukwaTest");
-    FileOutputStream fos = new FileOutputStream(tmpOutput);
-    Random r = new Random();
-    byte[] randomData = new byte[length];
-    r.nextBytes(randomData);
-    randomData[length - 1] = '\n';// need data to end with \n since default
-                                  // tailer uses that
-    fos.write(randomData);
-    fos.flush();
-    fos.close();
-    return tmpOutput;
-  }
-}

Modified: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/collector/TestBackpressure.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/collector/TestBackpressure.java?rev=808938&r1=808937&r2=808938&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/collector/TestBackpressure.java (original)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/collector/TestBackpressure.java Fri Aug 28 16:11:57 2009
@@ -52,8 +52,9 @@
         " testData "+ SEND_RATE + " 0");
     Thread.sleep(TEST_DURATION_SECS * 1000);
 
-    String stat = agent.getAdaptorList().get("constSend");
-    long kbytesPerSec = Long.valueOf(stat.split(" ")[3]) / TEST_DURATION_SECS / 1000;
+    String[] stat = agent.getAdaptorList().get("constSend").split(" ");
+    long kbytesPerSec = Long.valueOf(stat[stat.length -1]) / TEST_DURATION_SECS / 1000;
+
     System.out.println("data rate was " + kbytesPerSec + " kb /second");
     assertTrue(kbytesPerSec < WRITE_RATE_KB); //write rate should throttle sends
     assertTrue(kbytesPerSec > MIN_ACCEPTABLE_PERCENT* WRITE_RATE_KB / 100);//an assumption, but should hold true

Modified: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/connector/TestFailedCollector.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/connector/TestFailedCollector.java?rev=808938&r1=808937&r2=808938&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/connector/TestFailedCollector.java (original)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/connector/TestFailedCollector.java Fri Aug 28 16:11:57 2009
@@ -21,10 +21,10 @@
 
 import java.io.File;
 import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
-import org.apache.hadoop.chukwa.datacollection.TempFileUtil;
 import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
 import org.apache.hadoop.chukwa.datacollection.connector.http.HttpConnector;
 import org.apache.hadoop.chukwa.datacollection.controller.ChukwaAgentController;
+import org.apache.hadoop.chukwa.util.TempFileUtil;
 import junit.framework.TestCase;
 
 public class TestFailedCollector extends TestCase {

Modified: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/sender/TestRetryListOfCollectors.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/sender/TestRetryListOfCollectors.java?rev=808938&r1=808937&r2=808938&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/sender/TestRetryListOfCollectors.java (original)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/sender/TestRetryListOfCollectors.java Fri Aug 28 16:11:57 2009
@@ -34,6 +34,7 @@
     hosts.add("host4");
     Configuration conf = new Configuration();
     RetryListOfCollectors rloc = new RetryListOfCollectors(hosts, conf);
+    rloc.shuffleList();
     assertEquals(hosts.size(), rloc.total());
 
     for (int i = 0; i < hosts.size(); ++i) {

Added: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/util/TestCRValidator.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/util/TestCRValidator.java?rev=808938&view=auto
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/util/TestCRValidator.java (added)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/util/TestCRValidator.java Fri Aug 28 16:11:57 2009
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.chukwa.util;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.hadoop.chukwa.*;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import junit.framework.TestCase;
+import static org.apache.hadoop.chukwa.util.ConstRateValidator.ByteRange;
+import static org.apache.hadoop.chukwa.util.ConstRateValidator.ValidatorSM;
+import static org.apache.hadoop.chukwa.util.TempFileUtil.writeASinkFile;
+
+public class TestCRValidator extends TestCase{
+  
+  public void testCRchunks() {
+    ConstRateAdaptor adaptor = new ConstRateAdaptor();
+    adaptor.parseArgs("500  200 ");
+    adaptor.test_init("testdata");
+    Chunk c = adaptor.nextChunk(100);
+    assertTrue(ConstRateAdaptor.checkChunk(c));
+    c = adaptor.nextChunk(102);
+    assertTrue(ConstRateAdaptor.checkChunk(c));
+  }
+  
+  public void testBasicSM() throws Exception {
+    ValidatorSM sm = new ValidatorSM();
+    byte[] dat = "test".getBytes();    
+    ChunkImpl c = new ChunkImpl("Data", "aname", dat.length, dat, null);
+    ByteRange b = new ByteRange(c);
+    assertEquals(4, b.len);
+    assertEquals(0, b.start);
+    String t = sm.advanceSM(b);
+    assertNull(t);
+    if(t != null)
+      System.out.println(t);
+
+    dat = "ing".getBytes();
+    c = new ChunkImpl("Data", "aname", dat.length+4, dat, null);
+    b = new ByteRange(c);
+    assertEquals(4, b.start);
+    t = sm.advanceSM(b);
+    assertNull(t);
+    if(t != null)
+      System.out.println(t);
+    
+   b = new ByteRange(new ChunkImpl("Data", "aname", 12, "more".getBytes(), null));
+   t= sm.advanceSM(b);
+   System.out.println(t);
+  }
+  
+  public void testSlurping() throws Exception {
+    int NUM_CHUNKS = 10;
+    Configuration conf = new Configuration();
+    FileSystem localfs = FileSystem.getLocal(conf);
+    String baseDir = System.getProperty("test.build.data", "/tmp");
+    Path tmpFile = new Path(baseDir+"/tmpSeqFile.seq");
+    writeASinkFile(conf, localfs, tmpFile, NUM_CHUNKS);
+     
+    ValidatorSM sm = new ValidatorSM();
+    
+    try {
+      SequenceFile.Reader reader = new SequenceFile.Reader(localfs, tmpFile, conf);
+
+      ChukwaArchiveKey key = new ChukwaArchiveKey();
+      ChunkImpl chunk = ChunkImpl.getBlankChunk();
+
+      while (reader.next(key, chunk)) {
+          String s = sm.advanceSM(new ByteRange(chunk));
+          assertNull(s);
+      }
+      reader.close();
+      assertEquals(NUM_CHUNKS, sm.chunks);      
+      localfs.delete(tmpFile);
+    } catch(IOException e) {
+      e.printStackTrace();
+    }
+    
+  }
+
+}