You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2012/06/12 22:21:12 UTC

svn commit: r1349503 - in /accumulo/trunk/server/src/main/java/org/apache/accumulo/server/metanalysis: ./ FilterMeta.java FindTablet.java IndexMeta.java LogFileInputFormat.java LogFileOutputFormat.java PrintEvents.java package-info.java

Author: kturner
Date: Tue Jun 12 20:21:11 2012
New Revision: 1349503

URL: http://svn.apache.org/viewvc?rev=1349503&view=rev
Log:
ACCUMULO-549 initial checkin of a set of utilities for indexing and analyzing metadata table mutations found in write ahead logs

Added:
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/metanalysis/
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/metanalysis/FilterMeta.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/metanalysis/FindTablet.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/metanalysis/IndexMeta.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/metanalysis/LogFileInputFormat.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/metanalysis/LogFileOutputFormat.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/metanalysis/PrintEvents.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/metanalysis/package-info.java

Added: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/metanalysis/FilterMeta.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/metanalysis/FilterMeta.java?rev=1349503&view=auto
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/metanalysis/FilterMeta.java (added)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/metanalysis/FilterMeta.java Tue Jun 12 20:21:11 2012
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.metanalysis;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.accumulo.server.logger.LogEvents;
+import org.apache.accumulo.server.logger.LogFileKey;
+import org.apache.accumulo.server.logger.LogFileValue;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+/**
+ * A map reduce job that takes a set of walogs and filters out all non metadata table events.
+ */
+public class FilterMeta extends Configured implements Tool {
+  
+  public static class FilterMapper extends Mapper<LogFileKey,LogFileValue,LogFileKey,LogFileValue> {
+    private Set<Integer> tabletIds;
+    
+    @Override
+    protected void setup(Context context) throws java.io.IOException, java.lang.InterruptedException {
+      tabletIds = new HashSet<Integer>();
+    }
+    
+    @Override
+    public void map(LogFileKey key, LogFileValue value, Context context) throws IOException, InterruptedException {
+      if (key.event == LogEvents.OPEN) {
+        context.write(key, value);
+      } else if (key.event == LogEvents.DEFINE_TABLET && key.tablet.getTableId().toString().equals(Constants.METADATA_TABLE_ID)) {
+        tabletIds.add(key.tid);
+        context.write(key, value);
+      } else if ((key.event == LogEvents.MUTATION || key.event == LogEvents.MANY_MUTATIONS) && tabletIds.contains(key.tid)) {
+        context.write(key, value);
+      }
+    }
+  }
+
+  @Override
+  public int run(String[] args) throws Exception {
+    
+    String jobName = this.getClass().getSimpleName() + "_" + System.currentTimeMillis();
+    
+    Job job = new Job(getConf(), jobName);
+    job.setJarByClass(this.getClass());
+    
+    Path paths[] = new Path[args.length - 1];
+    for (int i = 0; i < paths.length; i++) {
+      paths[i] = new Path(args[i]);
+    }
+
+    job.setInputFormatClass(LogFileInputFormat.class);
+    LogFileInputFormat.setInputPaths(job, paths);
+    
+    job.setOutputFormatClass(LogFileOutputFormat.class);
+    LogFileOutputFormat.setOutputPath(job, new Path(args[args.length - 1]));
+
+    job.setMapperClass(FilterMapper.class);
+    
+    job.setNumReduceTasks(0);
+
+    job.waitForCompletion(true);
+    return job.isSuccessful() ? 0 : 1;
+  }
+  
+  public static void main(String[] args) throws Exception {
+    int res = ToolRunner.run(CachedConfiguration.getInstance(), new FilterMeta(), args);
+    System.exit(res);
+  }
+}

Added: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/metanalysis/FindTablet.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/metanalysis/FindTablet.java?rev=1349503&view=auto
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/metanalysis/FindTablet.java (added)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/metanalysis/FindTablet.java Tue Jun 12 20:21:11 2012
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.metanalysis;
+
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.TextUtil;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.hadoop.io.Text;
+
+/**
+ * Finds tablet creation events.
+ */
+public class FindTablet {
+  public static void main(String[] args) throws Exception {
+    
+    Options options = new Options();
+    options.addOption("r", "row", true, "find tablets that contain this row");
+    
+    GnuParser parser = new GnuParser();
+    CommandLine cmd = null;
+    try {
+      cmd = parser.parse(options, args);
+      if (cmd.getArgs().length != 5) {
+        throw new ParseException("Command takes no arguments");
+      }
+    } catch (ParseException e) {
+      System.err.println("Failed to parse command line " + e.getMessage());
+      System.err.println();
+      HelpFormatter formatter = new HelpFormatter();
+      formatter.printHelp(FindTablet.class.getSimpleName() + " <instance> <zookeepers> <user> <pass> <table ID>", options);
+      System.exit(-1);
+    }
+    
+    String instance = cmd.getArgs()[0];
+    String zookeepers = cmd.getArgs()[1];
+    String user = cmd.getArgs()[2];
+    String pass = cmd.getArgs()[3];
+    String tableID = cmd.getArgs()[4];
+    
+    ZooKeeperInstance zki = new ZooKeeperInstance(instance, zookeepers);
+    Connector conn = zki.getConnector(user, pass);
+    
+    if (cmd.hasOption('r')) {
+      findContainingTablets(conn, tableID, cmd.getOptionValue('r'));
+    } else {
+      System.err.println("ERROR :  No search criteria given");
+    }
+  }
+
+  /**
+   * @param conn
+   * @param tablePrefix
+   * @param tableID
+   * @param option
+   */
+  private static void findContainingTablets(Connector conn, String tableID, String row) throws Exception {
+    Range range = new KeyExtent(new Text(tableID), null, null).toMetadataRange();
+
+    Scanner scanner = conn.createScanner("createEvents", new Authorizations());
+    
+    scanner.setRange(range);
+    
+    for (Entry<Key,Value> entry : scanner) {
+      KeyExtent ke = new KeyExtent(entry.getKey().getRow(), new Value(TextUtil.getBytes(entry.getKey().getColumnFamily())));
+      if (ke.contains(new Text(row))) {
+        System.out.println(entry.getKey().getColumnQualifier() + " " + ke + " " + entry.getValue());
+      }
+    }
+  }
+}

Added: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/metanalysis/IndexMeta.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/metanalysis/IndexMeta.java?rev=1349503&view=auto
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/metanalysis/IndexMeta.java (added)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/metanalysis/IndexMeta.java Tue Jun 12 20:21:11 2012
@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.metanalysis;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.TableExistsException;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
+import org.apache.accumulo.core.data.ColumnUpdate;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.accumulo.server.logger.LogEvents;
+import org.apache.accumulo.server.logger.LogFileKey;
+import org.apache.accumulo.server.logger.LogFileValue;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.log4j.Logger;
+
+/**
+ * A map reduce job that takes write ahead logs containing mutations for the metadata table and indexes them into Accumulo tables for analysis.
+ * 
+ */
+
+public class IndexMeta extends Configured implements Tool {
+  
+  public static class IndexMapper extends Mapper<LogFileKey,LogFileValue,Text,Mutation> {
+    private static final Text CREATE_EVENTS_TABLE = new Text("createEvents");
+    private static final Text TABLET_EVENTS_TABLE = new Text("tabletEvents");
+    private Map<Integer,KeyExtent> tabletIds = new HashMap<Integer,KeyExtent>();
+    private String uuid = null;
+    
+    @Override
+    protected void setup(Context context) throws java.io.IOException, java.lang.InterruptedException {
+      tabletIds = new HashMap<Integer,KeyExtent>();
+      uuid = null;
+    }
+    
+    @Override
+    public void map(LogFileKey key, LogFileValue value, Context context) throws IOException, InterruptedException {
+      if (key.event == LogEvents.OPEN) {
+        uuid = key.tserverSession;
+      } else if (key.event == LogEvents.DEFINE_TABLET) {
+        if (key.tablet.getTableId().toString().equals(Constants.METADATA_TABLE_ID)) {
+          tabletIds.put(key.tid, new KeyExtent(key.tablet));
+        }
+      } else if ((key.event == LogEvents.MUTATION || key.event == LogEvents.MANY_MUTATIONS) && tabletIds.containsKey(key.tid)) {
+        for (Mutation m : value.mutations) {
+          index(context, m, uuid, tabletIds.get(key.tid));
+        }
+      }
+    }
+    
+    void index(Context context, Mutation m, String logFile, KeyExtent metaTablet) throws IOException, InterruptedException {
+      List<ColumnUpdate> columnsUpdates = m.getUpdates();
+      
+      Text prevRow = null;
+      long timestamp = 0;
+      
+      if (m.getRow().length > 0 && m.getRow()[0] == '~') {
+        return;
+      }
+      
+      for (ColumnUpdate cu : columnsUpdates) {
+        if (Constants.METADATA_PREV_ROW_COLUMN.equals(new Text(cu.getColumnFamily()), new Text(cu.getColumnQualifier())) && !cu.isDeleted()) {
+          prevRow = new Text(cu.getValue());
+        }
+        
+        timestamp = cu.getTimestamp();
+      }
+      
+      byte[] serMut = WritableUtils.toByteArray(m);
+      
+      if (prevRow != null) {
+        Mutation createEvent = new Mutation(new Text(m.getRow()));
+        createEvent.put(prevRow, new Text(String.format("%020d", timestamp)), new Value(metaTablet.toString().getBytes()));
+        context.write(CREATE_EVENTS_TABLE, createEvent);
+      }
+      
+      Mutation tabletEvent = new Mutation(new Text(m.getRow()));
+      tabletEvent.put(new Text(String.format("%020d", timestamp)), new Text("mut"), new Value(serMut));
+      tabletEvent.put(new Text(String.format("%020d", timestamp)), new Text("mtab"), new Value(metaTablet.toString().getBytes()));
+      tabletEvent.put(new Text(String.format("%020d", timestamp)), new Text("log"), new Value(logFile.getBytes()));
+      context.write(TABLET_EVENTS_TABLE, tabletEvent);
+    }
+  }
+
+  
+  
+  @Override
+  public int run(String[] args) throws Exception {
+    if (args.length < 5) {
+      System.err.println("Usage : " + IndexMeta.class + " <instance> <zookeepers> <user> <pass> <logfile> {<logfile>}");
+      return -1;
+    }
+    
+    String instance = args[0];
+    String zookeepers = args[1];
+    String user = args[2];
+    String pass = args[3];
+
+    String jobName = this.getClass().getSimpleName() + "_" + System.currentTimeMillis();
+    
+    Job job = new Job(getConf(), jobName);
+    job.setJarByClass(this.getClass());
+
+    List<String> logFiles = Arrays.asList(args).subList(4, args.length);
+    Path paths[] = new Path[logFiles.size()];
+    int count = 0;
+    for (String logFile : logFiles) {
+      paths[count++] = new Path(logFile);
+    }
+    
+    job.setInputFormatClass(LogFileInputFormat.class);
+    LogFileInputFormat.setInputPaths(job, paths);
+    
+    job.setNumReduceTasks(0);
+    
+    job.setOutputFormatClass(AccumuloOutputFormat.class);
+    AccumuloOutputFormat.setZooKeeperInstance(job.getConfiguration(), instance, zookeepers);
+    AccumuloOutputFormat.setOutputInfo(job.getConfiguration(), user, pass.getBytes(), false, null);
+    
+    job.setMapperClass(IndexMapper.class);
+
+    ZooKeeperInstance zki = new ZooKeeperInstance(instance, zookeepers);
+    Connector conn = zki.getConnector(user, pass);
+    
+    try {
+      conn.tableOperations().create("createEvents");
+    } catch (TableExistsException tee) {
+      Logger.getLogger(IndexMeta.class).warn("Table createEvents exists");
+    }
+    
+    try {
+      conn.tableOperations().create("tabletEvents");
+    } catch (TableExistsException tee) {
+      Logger.getLogger(IndexMeta.class).warn("Table tabletEvents exists");
+    }
+    
+    job.waitForCompletion(true);
+    return job.isSuccessful() ? 0 : 1;
+  }
+  
+  public static void main(String[] args) throws Exception {
+    int res = ToolRunner.run(CachedConfiguration.getInstance(), new IndexMeta(), args);
+    System.exit(res);
+  }
+}

Added: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/metanalysis/LogFileInputFormat.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/metanalysis/LogFileInputFormat.java?rev=1349503&view=auto
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/metanalysis/LogFileInputFormat.java (added)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/metanalysis/LogFileInputFormat.java Tue Jun 12 20:21:11 2012
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.server.metanalysis;
+
+import java.io.EOFException;
+import java.io.IOException;
+
+import org.apache.accumulo.server.logger.LogFileKey;
+import org.apache.accumulo.server.logger.LogFileValue;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+
+/**
+ * Input format for Accumulo write ahead logs
+ */
+public class LogFileInputFormat extends FileInputFormat<LogFileKey,LogFileValue> {
+  
+  private static class LogFileRecordReader extends RecordReader<LogFileKey,LogFileValue> {
+    
+    private FSDataInputStream fsdis;
+    private LogFileKey key;
+    private LogFileValue value;
+    private long length;
+    
+    @Override
+    public void close() throws IOException {
+      fsdis.close();
+    }
+    
+    @Override
+    public LogFileKey getCurrentKey() throws IOException, InterruptedException {
+      return key;
+    }
+    
+    @Override
+    public LogFileValue getCurrentValue() throws IOException, InterruptedException {
+      return value;
+    }
+    
+    @Override
+    public float getProgress() throws IOException, InterruptedException {
+      float progress = (length - fsdis.getPos()) / (float) length;
+      if (progress < 0)
+        return 0;
+      return progress;
+    }
+    
+    @Override
+    public void initialize(InputSplit is, TaskAttemptContext context) throws IOException, InterruptedException {
+      FileSplit fileSplit = (FileSplit) is;
+      
+      Configuration conf = new Configuration();
+      FileSystem fs = FileSystem.get(conf);
+      
+      key = new LogFileKey();
+      value = new LogFileValue();
+      
+      fsdis = fs.open(fileSplit.getPath());
+      FileStatus status = fs.getFileStatus(fileSplit.getPath());
+      length = status.getLen();
+    }
+
+    @Override
+    public boolean nextKeyValue() throws IOException, InterruptedException {
+      if (key == null)
+        return false;
+      
+      try {
+        key.readFields(fsdis);
+        value.readFields(fsdis);
+        return true;
+      } catch (EOFException ex) {
+        key = null;
+        value = null;
+        return false;
+      }
+    }
+    
+  }
+
+  
+  @Override
+  public RecordReader<LogFileKey,LogFileValue> createRecordReader(InputSplit arg0, TaskAttemptContext arg1) throws IOException, InterruptedException {
+    return new LogFileRecordReader();
+  }
+  
+  @Override
+  protected boolean isSplitable(JobContext context, Path filename) {
+    return false;
+  }
+  
+}

Added: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/metanalysis/LogFileOutputFormat.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/metanalysis/LogFileOutputFormat.java?rev=1349503&view=auto
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/metanalysis/LogFileOutputFormat.java (added)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/metanalysis/LogFileOutputFormat.java Tue Jun 12 20:21:11 2012
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.metanalysis;
+
+import java.io.IOException;
+
+import org.apache.accumulo.server.logger.LogFileKey;
+import org.apache.accumulo.server.logger.LogFileValue;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+/**
+ * Output format for Accumulo write ahead logs.
+ */
+public class LogFileOutputFormat extends FileOutputFormat<LogFileKey,LogFileValue> {
+  
+  private static class LogFileRecordWriter extends RecordWriter<LogFileKey,LogFileValue> {
+    
+    private FSDataOutputStream out;
+    
+    /**
+     * @param outputPath
+     * @throws IOException
+     */
+    public LogFileRecordWriter(Path outputPath) throws IOException {
+      Configuration conf = new Configuration();
+      FileSystem fs = FileSystem.get(conf);
+      
+      out = fs.create(outputPath);
+    }
+
+    @Override
+    public void close(TaskAttemptContext arg0) throws IOException, InterruptedException {
+      out.close();
+    }
+    
+    @Override
+    public void write(LogFileKey key, LogFileValue val) throws IOException, InterruptedException {
+      key.write(out);
+      val.write(out);
+    }
+    
+  }
+
+  @Override
+  public RecordWriter<LogFileKey,LogFileValue> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
+    Path outputPath = getDefaultWorkFile(context, "");
+    return new LogFileRecordWriter(outputPath);
+  }
+  
+}

Added: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/metanalysis/PrintEvents.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/metanalysis/PrintEvents.java?rev=1349503&view=auto
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/metanalysis/PrintEvents.java (added)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/metanalysis/PrintEvents.java Tue Jun 12 20:21:11 2012
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.metanalysis;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.util.List;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.data.ColumnUpdate;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.PartialKey;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.server.logger.LogFileValue;
+import org.apache.hadoop.io.Text;
+
+/**
+ * Looks up and prints mutations indexed by IndexMeta
+ */
+public class PrintEvents {
+  
+  /**
+   * @param args
+   */
+  public static void main(String[] args) throws Exception {
+    if (args.length != 7) {
+      System.err.println("Usage : " + IndexMeta.class + " <instance> <zookeepers> <user> <pass> <tableId> <endRow> <time>");
+      return;
+    }
+    
+    String instance = args[0];
+    String zookeepers = args[1];
+    String user = args[2];
+    String pass = args[3];
+    String tableId = args[4];
+    String endRow = args[5];
+    Long time = Long.parseLong(args[6]);
+    
+    ZooKeeperInstance zki = new ZooKeeperInstance(instance, zookeepers);
+    Connector conn = zki.getConnector(user, pass);
+    
+    if (endRow.equals("null")) {
+      endRow = null;
+    }
+
+    printEvents(conn, tableId, endRow, time);
+  }
+  
+  /**
+   * @param conn
+   * @param tablePrefix
+   * @param tableId
+   * @param endRow
+   * @param time
+   */
+  private static void printEvents(Connector conn, String tableId, String endRow, Long time) throws Exception {
+    Scanner scanner = conn.createScanner("tabletEvents", new Authorizations());
+    String metaRow = tableId + (endRow == null ? "<" : ";" + endRow);
+    scanner.setRange(new Range(new Key(metaRow, String.format("%020d", time)), true, new Key(metaRow).followingKey(PartialKey.ROW), false));
+    int count = 0;
+    
+    String lastLog = null;
+
+    loop1: for (Entry<Key,Value> entry : scanner) {
+      if (entry.getKey().getColumnQualifier().toString().equals("log")) {
+        if (lastLog == null || !lastLog.equals(entry.getValue().toString()))
+          System.out.println("Log : " + entry.getValue());
+        lastLog = entry.getValue().toString();
+      } else if (entry.getKey().getColumnQualifier().toString().equals("mut")) {
+        DataInputStream dis = new DataInputStream(new ByteArrayInputStream(entry.getValue().get()));
+        Mutation m = new Mutation();
+        m.readFields(dis);
+        
+        LogFileValue lfv = new LogFileValue();
+        lfv.mutations = new Mutation[] {m};
+        
+        System.out.println(LogFileValue.format(lfv, 1));
+        
+        List<ColumnUpdate> columnsUpdates = m.getUpdates();
+        for (ColumnUpdate cu : columnsUpdates) {
+          if (Constants.METADATA_PREV_ROW_COLUMN.equals(new Text(cu.getColumnFamily()), new Text(cu.getColumnQualifier())) && count > 0) {
+            System.out.println("Saw change to prevrow, stopping printing events.");
+            break loop1;
+          }
+        }
+        count++;
+      }
+    }
+
+  }
+}

Added: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/metanalysis/package-info.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/metanalysis/package-info.java?rev=1349503&view=auto
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/metanalysis/package-info.java (added)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/metanalysis/package-info.java Tue Jun 12 20:21:11 2012
@@ -0,0 +1,18 @@
+/**
+ * Provides programs to analyze metadata mutations written to write ahead logs.  
+ * 
+ * <p>
+ * These programs can be used when write ahead logs are archived.   The best way to find
+ * which write ahead logs contain metadata mutations is to grep the tablet server logs.  
+ * Grep for events where walogs were added to metadata tablets, then take the unique set 
+ * of walogs.
+ *
+ * <p>
+ * To use these programs, use IndexMeta to index the metadata mutations in walogs into 
+ * Accumulo tables.  Then use FindTable and PrintEvents to analyze those indexes.  
+ * FilterMetaiallows filtering walogs down to just metadata events.  This is useful for the
+ * case where the walogs need to be exported from the cluster for analysis.
+ *
+ * @since 1.5
+ */
+package org.apache.accumulo.server.metanalysis;
\ No newline at end of file