You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2009/09/04 23:38:25 UTC

svn commit: r811559 - in /hadoop/hbase/trunk: CHANGES.txt src/test/org/apache/hadoop/hbase/PerformanceEvaluation.java

Author: stack
Date: Fri Sep  4 21:38:25 2009
New Revision: 811559

URL: http://svn.apache.org/viewvc?rev=811559&view=rev
Log:
HBASE-1778 Improve PerformanceEvaluation

Modified:
    hadoop/hbase/trunk/CHANGES.txt
    hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/PerformanceEvaluation.java

Modified: hadoop/hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/CHANGES.txt?rev=811559&r1=811558&r2=811559&view=diff
==============================================================================
--- hadoop/hbase/trunk/CHANGES.txt (original)
+++ hadoop/hbase/trunk/CHANGES.txt Fri Sep  4 21:38:25 2009
@@ -12,6 +12,7 @@
    HBASE-1798  [Regression] Unable to delete a row in the future
    HBASE-1790  filters are not working correctly (HBASE-1710 HBASE-1807 too)
    HBASE-1779  ThriftServer logged error if getVer() result is empty
+   HBASE-1778  Improve PerformanceEvaluation (Schubert Zhang via Stack)
 
   IMPROVEMENTS
    HBASE-1760  Cleanup TODOs in HTable

Modified: hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/PerformanceEvaluation.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/PerformanceEvaluation.java?rev=811559&r1=811558&r2=811559&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/PerformanceEvaluation.java (original)
+++ hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/PerformanceEvaluation.java Fri Sep  4 21:38:25 2009
@@ -19,6 +19,8 @@
  */
 package org.apache.hadoop.hbase;
 
+import java.io.DataInput;
+import java.io.DataOutput;
 import java.io.IOException;
 import java.io.PrintStream;
 import java.text.SimpleDateFormat;
@@ -35,6 +37,8 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.client.Get;
@@ -42,8 +46,8 @@
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.filter.PageFilter;
 import org.apache.hadoop.hbase.filter.WhileMatchFilter;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -52,17 +56,19 @@
 import org.apache.hadoop.hbase.util.MurmurHash;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.TextInputFormat;
-import org.apache.hadoop.mapred.TextOutputFormat;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.mapreduce.lib.reduce.LongSumReducer;
+import org.apache.hadoop.util.LineReader;
 
 
 /**
@@ -94,7 +100,7 @@
   protected static final HTableDescriptor TABLE_DESCRIPTOR;
   static {
     TABLE_DESCRIPTOR = new HTableDescriptor("TestTable");
-    TABLE_DESCRIPTOR.addFamily(new HColumnDescriptor(CATALOG_FAMILY));
+    TABLE_DESCRIPTOR.addFamily(new HColumnDescriptor(FAMILY_NAME));
   }
   
   private static final String RANDOM_READ = "randomRead";
@@ -160,46 +166,214 @@
   }
   
   /**
+   *  This class works as the InputSplit of Performance Evaluation
+   *  MapReduce InputFormat, and the Record Value of RecordReader. 
+   *  Each map task will only read one record from a PeInputSplit, 
+   *  the record value is the PeInputSplit itself.
+   */
+  public static class PeInputSplit extends InputSplit implements Writable {
+    private int startRow = 0;
+    private int rows = 0;
+    private int totalRows = 0;
+    private int clients = 0;
+      
+    public PeInputSplit() {
+      this.startRow = 0;
+      this.rows = 0;
+      this.totalRows = 0;
+      this.clients = 0;
+    }
+    
+    public PeInputSplit(int startRow, int rows, int totalRows, int clients) {
+      this.startRow = startRow;
+      this.rows = rows;
+      this.totalRows = totalRows;
+      this.clients = clients;
+    }
+    
+    @Override
+    public void readFields(DataInput in) throws IOException {
+      this.startRow = in.readInt();
+      this.rows = in.readInt();
+      this.totalRows = in.readInt();
+      this.clients = in.readInt();
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+      out.writeInt(startRow);
+      out.writeInt(rows);
+      out.writeInt(totalRows);
+      out.writeInt(clients);
+    }
+    
+    @Override
+    public long getLength() throws IOException, InterruptedException {
+      return 0;
+    }
+	
+    @Override
+    public String[] getLocations() throws IOException, InterruptedException {
+      return new String[0];
+    }
+    
+    public int getStartRow() {
+      return startRow;
+    }
+    
+    public int getRows() {
+      return rows;
+    }
+    
+    public int getTotalRows() {
+      return totalRows;
+    }
+    
+    public int getClients() {
+      return clients;
+    }
+  }
+  
+  /**
+   *  InputFormat of Performance Evaluation MapReduce job.
+   *  It extends from FileInputFormat, want to use it's methods such as setInputPaths(). 
+   */
+  public static class PeInputFormat extends FileInputFormat<NullWritable, PeInputSplit> {
+
+    @Override
+    public List<InputSplit> getSplits(JobContext job) throws IOException {
+      // generate splits
+      List<InputSplit> splitList = new ArrayList<InputSplit>();
+      
+      for (FileStatus file: listStatus(job)) {
+        Path path = file.getPath();
+        FileSystem fs = path.getFileSystem(job.getConfiguration());
+        FSDataInputStream fileIn = fs.open(path);
+        LineReader in = new LineReader(fileIn, job.getConfiguration());
+        int lineLen = 0;
+        while(true) {
+          Text lineText = new Text();
+          lineLen = in.readLine(lineText);
+          if(lineLen <= 0) {
+        	break;
+          }
+          Matcher m = LINE_PATTERN.matcher(lineText.toString());
+          if((m != null) && m.matches()) {
+            int startRow = Integer.parseInt(m.group(1));
+            int rows = Integer.parseInt(m.group(2));
+            int totalRows = Integer.parseInt(m.group(3));
+            int clients = Integer.parseInt(m.group(4));
+            
+            LOG.debug("split["+ splitList.size() + "] " + 
+                     " startRow=" + startRow +
+                     " rows=" + rows +
+                     " totalRows=" + totalRows +
+                     " clients=" + clients);
+					  
+            PeInputSplit newSplit = new PeInputSplit(startRow, rows, totalRows, clients);
+            splitList.add(newSplit);
+          }
+        }
+        in.close();
+      }
+      
+      LOG.info("Total # of splits: " + splitList.size());
+      return splitList;
+    }
+    
+    @Override
+    public RecordReader<NullWritable, PeInputSplit> createRecordReader(InputSplit split,
+    												TaskAttemptContext context) {
+      return new PeRecordReader();
+    }
+	  
+    public static class PeRecordReader extends RecordReader<NullWritable, PeInputSplit> {
+      private boolean readOver = false;
+      private PeInputSplit split = null;
+      private NullWritable key = null;
+      private PeInputSplit value = null;
+      
+      @Override
+      public void initialize(InputSplit split, TaskAttemptContext context) 
+      						throws IOException, InterruptedException {
+        this.readOver = false;
+        this.split = (PeInputSplit)split;
+      }
+      
+      @Override
+      public boolean nextKeyValue() throws IOException, InterruptedException {
+        if(readOver) {
+          return false;
+        }
+			  
+        key = NullWritable.get();
+        value = (PeInputSplit)split;
+			  
+        readOver = true;
+        return true;
+      }
+      
+      @Override
+      public NullWritable getCurrentKey() throws IOException, InterruptedException {
+        return key;
+      }
+      
+      @Override
+      public PeInputSplit getCurrentValue() throws IOException, InterruptedException {
+        return value;
+      }
+      
+      @Override
+      public float getProgress() throws IOException, InterruptedException {
+        if(readOver) {
+          return 1.0f;
+        } else {
+          return 0.0f;
+        }
+      }
+      
+      @Override
+      public void close() throws IOException {
+        // do nothing
+      }
+    }
+  }
+  
+  /**
    * MapReduce job that runs a performance evaluation client in each map task.
    */
-  @SuppressWarnings("unchecked")
-  public static class EvaluationMapTask extends MapReduceBase
-  implements Mapper {
+  public static class EvaluationMapTask 
+      extends Mapper<NullWritable, PeInputSplit, LongWritable, LongWritable> {
+
     /** configuration parameter name that contains the command */
     public final static String CMD_KEY = "EvaluationMapTask.command";
     private String cmd;
     private PerformanceEvaluation pe;
     
     @Override
-    public void configure(JobConf j) {
-      this.cmd = j.get(CMD_KEY);
-
-      this.pe = new PerformanceEvaluation(new HBaseConfiguration(j));
+    protected void setup(Context context) throws IOException, InterruptedException {
+    	this.cmd = context.getConfiguration().get(CMD_KEY);
+    	this.pe = new PerformanceEvaluation(new HBaseConfiguration(context.getConfiguration()));
     }
     
-    public void map(final Object key,
-      final Object value, final OutputCollector output,
-      final Reporter reporter)
-    throws IOException {
-      Matcher m = LINE_PATTERN.matcher(((Text)value).toString());
-      if (m != null && m.matches()) {
-        int startRow = Integer.parseInt(m.group(1));
-        int perClientRunRows = Integer.parseInt(m.group(2));
-        int totalRows = Integer.parseInt(m.group(3));
-        Status status = new Status() {
-          public void setStatus(String msg) {
-            reporter.setStatus(msg);
-          }
-        };
-        long elapsedTime =  this.pe.runOneClient(this.cmd, startRow,
-          perClientRunRows, totalRows, status);
-        // Collect how much time the thing took.  Report as map output and
-        // to the ELAPSED_TIME counter.
-        reporter.incrCounter(Counter.ELAPSED_TIME, elapsedTime);
-        reporter.incrCounter(Counter.ROWS, perClientRunRows);
-        output.collect(new LongWritable(startRow),
-          new Text(Long.toString(elapsedTime)));
-      }
+    protected void map(NullWritable key, PeInputSplit value, final Context context) 
+           throws IOException, InterruptedException {
+      
+      Status status = new Status() {
+        public void setStatus(String msg) {
+           context.setStatus(msg); 
+        }
+      };
+      
+      // Evaluation task
+      long elapsedTime = this.pe.runOneClient(this.cmd, value.getStartRow(), 
+    		                          value.getRows(), value.getTotalRows(), status);
+      // Collect how much time the thing took. Report as map output and
+      // to the ELAPSED_TIME counter.
+      context.getCounter(Counter.ELAPSED_TIME).increment(elapsedTime);
+      context.getCounter(Counter.ROWS).increment(value.rows);
+      context.write(new LongWritable(value.startRow), new LongWritable(elapsedTime));
+      context.progress();
     }
   }
   
@@ -225,7 +399,7 @@
    * @throws IOException
    */
   private void runNIsMoreThanOne(final String cmd)
-  throws IOException {
+  throws IOException, InterruptedException, ClassNotFoundException {
     checkTable(new HBaseAdmin(conf));
     if (this.nomapred) {
       doMultipleClients(cmd);
@@ -288,21 +462,29 @@
    * @param cmd Command to run.
    * @throws IOException
    */
-  private void doMapReduce(final String cmd) throws IOException {
+  private void doMapReduce(final String cmd) throws IOException, 
+  			InterruptedException, ClassNotFoundException {
     Path inputDir = writeInputFile(this.conf);
     this.conf.set(EvaluationMapTask.CMD_KEY, cmd);
-    JobConf job = new JobConf(this.conf, this.getClass());
-    FileInputFormat.setInputPaths(job, inputDir);
-    job.setInputFormat(TextInputFormat.class);
+    Job job = new Job(this.conf);
+    job.setJarByClass(PerformanceEvaluation.class);
     job.setJobName("HBase Performance Evaluation");
+    
+    job.setInputFormatClass(PeInputFormat.class);
+    PeInputFormat.setInputPaths(job, inputDir);
+    
+    job.setOutputKeyClass(LongWritable.class);
+    job.setOutputValueClass(LongWritable.class);
+    
     job.setMapperClass(EvaluationMapTask.class);
-    job.setMaxMapAttempts(1);
-    job.setMaxReduceAttempts(1);
-    job.setNumMapTasks(this.N * 10); // Ten maps per client.
+    job.setReducerClass(LongSumReducer.class);
+        
     job.setNumReduceTasks(1);
-    job.setOutputFormat(TextOutputFormat.class);
-    FileOutputFormat.setOutputPath(job, new Path(inputDir, "outputs"));
-    JobClient.runJob(job);
+    
+    job.setOutputFormatClass(TextOutputFormat.class);
+    TextOutputFormat.setOutputPath(job, new Path(inputDir,"outputs"));
+    
+    job.waitForCompletion(true);
   }
   
   /*
@@ -666,7 +848,8 @@
     } 
   }
 
-  private void runTest(final String cmd) throws IOException {
+  private void runTest(final String cmd) throws IOException, 
+  				InterruptedException, ClassNotFoundException {
     if (cmd.equals(RANDOM_READ_MEM)) {
       // For this one test, so all fits in memory, make R smaller (See
       // pg. 9 of BigTable paper).