You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2012/06/12 22:21:12 UTC
svn commit: r1349503 - in
/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/metanalysis:
./ FilterMeta.java FindTablet.java IndexMeta.java LogFileInputFormat.java
LogFileOutputFormat.java PrintEvents.java package-info.java
Author: kturner
Date: Tue Jun 12 20:21:11 2012
New Revision: 1349503
URL: http://svn.apache.org/viewvc?rev=1349503&view=rev
Log:
ACCUMULO-549 initial checkin of a set of utilities for indexing and analyzing metadata table mutations found in write ahead logs
Added:
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/metanalysis/
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/metanalysis/FilterMeta.java
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/metanalysis/FindTablet.java
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/metanalysis/IndexMeta.java
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/metanalysis/LogFileInputFormat.java
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/metanalysis/LogFileOutputFormat.java
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/metanalysis/PrintEvents.java
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/metanalysis/package-info.java
Added: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/metanalysis/FilterMeta.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/metanalysis/FilterMeta.java?rev=1349503&view=auto
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/metanalysis/FilterMeta.java (added)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/metanalysis/FilterMeta.java Tue Jun 12 20:21:11 2012
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.metanalysis;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.accumulo.server.logger.LogEvents;
+import org.apache.accumulo.server.logger.LogFileKey;
+import org.apache.accumulo.server.logger.LogFileValue;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+/**
+ * A map reduce job that takes a set of walogs and filters out all non metadata table events.
+ */
+public class FilterMeta extends Configured implements Tool {
+
+ public static class FilterMapper extends Mapper<LogFileKey,LogFileValue,LogFileKey,LogFileValue> {
+ private Set<Integer> tabletIds;
+
+ @Override
+ protected void setup(Context context) throws java.io.IOException, java.lang.InterruptedException {
+ tabletIds = new HashSet<Integer>();
+ }
+
+ @Override
+ public void map(LogFileKey key, LogFileValue value, Context context) throws IOException, InterruptedException {
+ if (key.event == LogEvents.OPEN) {
+ context.write(key, value);
+ } else if (key.event == LogEvents.DEFINE_TABLET && key.tablet.getTableId().toString().equals(Constants.METADATA_TABLE_ID)) {
+ tabletIds.add(key.tid);
+ context.write(key, value);
+ } else if ((key.event == LogEvents.MUTATION || key.event == LogEvents.MANY_MUTATIONS) && tabletIds.contains(key.tid)) {
+ context.write(key, value);
+ }
+ }
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+
+ String jobName = this.getClass().getSimpleName() + "_" + System.currentTimeMillis();
+
+ Job job = new Job(getConf(), jobName);
+ job.setJarByClass(this.getClass());
+
+ Path paths[] = new Path[args.length - 1];
+ for (int i = 0; i < paths.length; i++) {
+ paths[i] = new Path(args[i]);
+ }
+
+ job.setInputFormatClass(LogFileInputFormat.class);
+ LogFileInputFormat.setInputPaths(job, paths);
+
+ job.setOutputFormatClass(LogFileOutputFormat.class);
+ LogFileOutputFormat.setOutputPath(job, new Path(args[args.length - 1]));
+
+ job.setMapperClass(FilterMapper.class);
+
+ job.setNumReduceTasks(0);
+
+ job.waitForCompletion(true);
+ return job.isSuccessful() ? 0 : 1;
+ }
+
+ public static void main(String[] args) throws Exception {
+ int res = ToolRunner.run(CachedConfiguration.getInstance(), new FilterMeta(), args);
+ System.exit(res);
+ }
+}
Added: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/metanalysis/FindTablet.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/metanalysis/FindTablet.java?rev=1349503&view=auto
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/metanalysis/FindTablet.java (added)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/metanalysis/FindTablet.java Tue Jun 12 20:21:11 2012
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.metanalysis;
+
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.TextUtil;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.hadoop.io.Text;
+
+/**
+ * Finds tablet creation events.
+ */
+public class FindTablet {
+ public static void main(String[] args) throws Exception {
+
+ Options options = new Options();
+ options.addOption("r", "row", true, "find tablets that contain this row");
+
+ GnuParser parser = new GnuParser();
+ CommandLine cmd = null;
+ try {
+ cmd = parser.parse(options, args);
+ if (cmd.getArgs().length != 5) {
+ throw new ParseException("Command takes no arguments");
+ }
+ } catch (ParseException e) {
+ System.err.println("Failed to parse command line " + e.getMessage());
+ System.err.println();
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp(FindTablet.class.getSimpleName() + " <instance> <zookeepers> <user> <pass> <table ID>", options);
+ System.exit(-1);
+ }
+
+ String instance = cmd.getArgs()[0];
+ String zookeepers = cmd.getArgs()[1];
+ String user = cmd.getArgs()[2];
+ String pass = cmd.getArgs()[3];
+ String tableID = cmd.getArgs()[4];
+
+ ZooKeeperInstance zki = new ZooKeeperInstance(instance, zookeepers);
+ Connector conn = zki.getConnector(user, pass);
+
+ if (cmd.hasOption('r')) {
+ findContainingTablets(conn, tableID, cmd.getOptionValue('r'));
+ } else {
+ System.err.println("ERROR : No search criteria given");
+ }
+ }
+
+ /**
+ * @param conn
+ * @param tablePrefix
+ * @param tableID
+ * @param option
+ */
+ private static void findContainingTablets(Connector conn, String tableID, String row) throws Exception {
+ Range range = new KeyExtent(new Text(tableID), null, null).toMetadataRange();
+
+ Scanner scanner = conn.createScanner("createEvents", new Authorizations());
+
+ scanner.setRange(range);
+
+ for (Entry<Key,Value> entry : scanner) {
+ KeyExtent ke = new KeyExtent(entry.getKey().getRow(), new Value(TextUtil.getBytes(entry.getKey().getColumnFamily())));
+ if (ke.contains(new Text(row))) {
+ System.out.println(entry.getKey().getColumnQualifier() + " " + ke + " " + entry.getValue());
+ }
+ }
+ }
+}
Added: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/metanalysis/IndexMeta.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/metanalysis/IndexMeta.java?rev=1349503&view=auto
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/metanalysis/IndexMeta.java (added)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/metanalysis/IndexMeta.java Tue Jun 12 20:21:11 2012
@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.metanalysis;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.TableExistsException;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
+import org.apache.accumulo.core.data.ColumnUpdate;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.accumulo.server.logger.LogEvents;
+import org.apache.accumulo.server.logger.LogFileKey;
+import org.apache.accumulo.server.logger.LogFileValue;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.log4j.Logger;
+
+/**
+ * A map reduce job that takes write ahead logs containing mutations for the metadata table and indexes them into Accumulo tables for analysis.
+ *
+ */
+
+public class IndexMeta extends Configured implements Tool {
+
+ public static class IndexMapper extends Mapper<LogFileKey,LogFileValue,Text,Mutation> {
+ private static final Text CREATE_EVENTS_TABLE = new Text("createEvents");
+ private static final Text TABLET_EVENTS_TABLE = new Text("tabletEvents");
+ private Map<Integer,KeyExtent> tabletIds = new HashMap<Integer,KeyExtent>();
+ private String uuid = null;
+
+ @Override
+ protected void setup(Context context) throws java.io.IOException, java.lang.InterruptedException {
+ tabletIds = new HashMap<Integer,KeyExtent>();
+ uuid = null;
+ }
+
+ @Override
+ public void map(LogFileKey key, LogFileValue value, Context context) throws IOException, InterruptedException {
+ if (key.event == LogEvents.OPEN) {
+ uuid = key.tserverSession;
+ } else if (key.event == LogEvents.DEFINE_TABLET) {
+ if (key.tablet.getTableId().toString().equals(Constants.METADATA_TABLE_ID)) {
+ tabletIds.put(key.tid, new KeyExtent(key.tablet));
+ }
+ } else if ((key.event == LogEvents.MUTATION || key.event == LogEvents.MANY_MUTATIONS) && tabletIds.containsKey(key.tid)) {
+ for (Mutation m : value.mutations) {
+ index(context, m, uuid, tabletIds.get(key.tid));
+ }
+ }
+ }
+
+ void index(Context context, Mutation m, String logFile, KeyExtent metaTablet) throws IOException, InterruptedException {
+ List<ColumnUpdate> columnsUpdates = m.getUpdates();
+
+ Text prevRow = null;
+ long timestamp = 0;
+
+ if (m.getRow().length > 0 && m.getRow()[0] == '~') {
+ return;
+ }
+
+ for (ColumnUpdate cu : columnsUpdates) {
+ if (Constants.METADATA_PREV_ROW_COLUMN.equals(new Text(cu.getColumnFamily()), new Text(cu.getColumnQualifier())) && !cu.isDeleted()) {
+ prevRow = new Text(cu.getValue());
+ }
+
+ timestamp = cu.getTimestamp();
+ }
+
+ byte[] serMut = WritableUtils.toByteArray(m);
+
+ if (prevRow != null) {
+ Mutation createEvent = new Mutation(new Text(m.getRow()));
+ createEvent.put(prevRow, new Text(String.format("%020d", timestamp)), new Value(metaTablet.toString().getBytes()));
+ context.write(CREATE_EVENTS_TABLE, createEvent);
+ }
+
+ Mutation tabletEvent = new Mutation(new Text(m.getRow()));
+ tabletEvent.put(new Text(String.format("%020d", timestamp)), new Text("mut"), new Value(serMut));
+ tabletEvent.put(new Text(String.format("%020d", timestamp)), new Text("mtab"), new Value(metaTablet.toString().getBytes()));
+ tabletEvent.put(new Text(String.format("%020d", timestamp)), new Text("log"), new Value(logFile.getBytes()));
+ context.write(TABLET_EVENTS_TABLE, tabletEvent);
+ }
+ }
+
+
+
+ @Override
+ public int run(String[] args) throws Exception {
+ if (args.length < 5) {
+ System.err.println("Usage : " + IndexMeta.class + " <instance> <zookeepers> <user> <pass> <logfile> {<logfile>}");
+ return -1;
+ }
+
+ String instance = args[0];
+ String zookeepers = args[1];
+ String user = args[2];
+ String pass = args[3];
+
+ String jobName = this.getClass().getSimpleName() + "_" + System.currentTimeMillis();
+
+ Job job = new Job(getConf(), jobName);
+ job.setJarByClass(this.getClass());
+
+ List<String> logFiles = Arrays.asList(args).subList(4, args.length);
+ Path paths[] = new Path[logFiles.size()];
+ int count = 0;
+ for (String logFile : logFiles) {
+ paths[count++] = new Path(logFile);
+ }
+
+ job.setInputFormatClass(LogFileInputFormat.class);
+ LogFileInputFormat.setInputPaths(job, paths);
+
+ job.setNumReduceTasks(0);
+
+ job.setOutputFormatClass(AccumuloOutputFormat.class);
+ AccumuloOutputFormat.setZooKeeperInstance(job.getConfiguration(), instance, zookeepers);
+ AccumuloOutputFormat.setOutputInfo(job.getConfiguration(), user, pass.getBytes(), false, null);
+
+ job.setMapperClass(IndexMapper.class);
+
+ ZooKeeperInstance zki = new ZooKeeperInstance(instance, zookeepers);
+ Connector conn = zki.getConnector(user, pass);
+
+ try {
+ conn.tableOperations().create("createEvents");
+ } catch (TableExistsException tee) {
+ Logger.getLogger(IndexMeta.class).warn("Table createEvents exists");
+ }
+
+ try {
+ conn.tableOperations().create("tabletEvents");
+ } catch (TableExistsException tee) {
+ Logger.getLogger(IndexMeta.class).warn("Table tabletEvents exists");
+ }
+
+ job.waitForCompletion(true);
+ return job.isSuccessful() ? 0 : 1;
+ }
+
+ public static void main(String[] args) throws Exception {
+ int res = ToolRunner.run(CachedConfiguration.getInstance(), new IndexMeta(), args);
+ System.exit(res);
+ }
+}
Added: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/metanalysis/LogFileInputFormat.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/metanalysis/LogFileInputFormat.java?rev=1349503&view=auto
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/metanalysis/LogFileInputFormat.java (added)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/metanalysis/LogFileInputFormat.java Tue Jun 12 20:21:11 2012
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.server.metanalysis;
+
+import java.io.EOFException;
+import java.io.IOException;
+
+import org.apache.accumulo.server.logger.LogFileKey;
+import org.apache.accumulo.server.logger.LogFileValue;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+
+/**
+ * Input format for Accumulo write ahead logs
+ */
+public class LogFileInputFormat extends FileInputFormat<LogFileKey,LogFileValue> {
+
+ private static class LogFileRecordReader extends RecordReader<LogFileKey,LogFileValue> {
+
+ private FSDataInputStream fsdis;
+ private LogFileKey key;
+ private LogFileValue value;
+ private long length;
+
+ @Override
+ public void close() throws IOException {
+ fsdis.close();
+ }
+
+ @Override
+ public LogFileKey getCurrentKey() throws IOException, InterruptedException {
+ return key;
+ }
+
+ @Override
+ public LogFileValue getCurrentValue() throws IOException, InterruptedException {
+ return value;
+ }
+
+ @Override
+ public float getProgress() throws IOException, InterruptedException {
+ float progress = (length - fsdis.getPos()) / (float) length;
+ if (progress < 0)
+ return 0;
+ return progress;
+ }
+
+ @Override
+ public void initialize(InputSplit is, TaskAttemptContext context) throws IOException, InterruptedException {
+ FileSplit fileSplit = (FileSplit) is;
+
+ Configuration conf = new Configuration();
+ FileSystem fs = FileSystem.get(conf);
+
+ key = new LogFileKey();
+ value = new LogFileValue();
+
+ fsdis = fs.open(fileSplit.getPath());
+ FileStatus status = fs.getFileStatus(fileSplit.getPath());
+ length = status.getLen();
+ }
+
+ @Override
+ public boolean nextKeyValue() throws IOException, InterruptedException {
+ if (key == null)
+ return false;
+
+ try {
+ key.readFields(fsdis);
+ value.readFields(fsdis);
+ return true;
+ } catch (EOFException ex) {
+ key = null;
+ value = null;
+ return false;
+ }
+ }
+
+ }
+
+
+ @Override
+ public RecordReader<LogFileKey,LogFileValue> createRecordReader(InputSplit arg0, TaskAttemptContext arg1) throws IOException, InterruptedException {
+ return new LogFileRecordReader();
+ }
+
+ @Override
+ protected boolean isSplitable(JobContext context, Path filename) {
+ return false;
+ }
+
+}
Added: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/metanalysis/LogFileOutputFormat.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/metanalysis/LogFileOutputFormat.java?rev=1349503&view=auto
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/metanalysis/LogFileOutputFormat.java (added)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/metanalysis/LogFileOutputFormat.java Tue Jun 12 20:21:11 2012
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.metanalysis;
+
+import java.io.IOException;
+
+import org.apache.accumulo.server.logger.LogFileKey;
+import org.apache.accumulo.server.logger.LogFileValue;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+/**
+ * Output format for Accumulo write ahead logs.
+ */
+public class LogFileOutputFormat extends FileOutputFormat<LogFileKey,LogFileValue> {
+
+ private static class LogFileRecordWriter extends RecordWriter<LogFileKey,LogFileValue> {
+
+ private FSDataOutputStream out;
+
+ /**
+ * @param outputPath
+ * @throws IOException
+ */
+ public LogFileRecordWriter(Path outputPath) throws IOException {
+ Configuration conf = new Configuration();
+ FileSystem fs = FileSystem.get(conf);
+
+ out = fs.create(outputPath);
+ }
+
+ @Override
+ public void close(TaskAttemptContext arg0) throws IOException, InterruptedException {
+ out.close();
+ }
+
+ @Override
+ public void write(LogFileKey key, LogFileValue val) throws IOException, InterruptedException {
+ key.write(out);
+ val.write(out);
+ }
+
+ }
+
+ @Override
+ public RecordWriter<LogFileKey,LogFileValue> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
+ Path outputPath = getDefaultWorkFile(context, "");
+ return new LogFileRecordWriter(outputPath);
+ }
+
+}
Added: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/metanalysis/PrintEvents.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/metanalysis/PrintEvents.java?rev=1349503&view=auto
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/metanalysis/PrintEvents.java (added)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/metanalysis/PrintEvents.java Tue Jun 12 20:21:11 2012
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.metanalysis;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.util.List;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.data.ColumnUpdate;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.PartialKey;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.server.logger.LogFileValue;
+import org.apache.hadoop.io.Text;
+
+/**
+ * Looks up and prints mutations indexed by IndexMeta
+ */
+public class PrintEvents {
+
+ /**
+ * @param args
+ */
+ public static void main(String[] args) throws Exception {
+ if (args.length != 7) {
+ System.err.println("Usage : " + IndexMeta.class + " <instance> <zookeepers> <user> <pass> <tableId> <endRow> <time>");
+ return;
+ }
+
+ String instance = args[0];
+ String zookeepers = args[1];
+ String user = args[2];
+ String pass = args[3];
+ String tableId = args[4];
+ String endRow = args[5];
+ Long time = Long.parseLong(args[6]);
+
+ ZooKeeperInstance zki = new ZooKeeperInstance(instance, zookeepers);
+ Connector conn = zki.getConnector(user, pass);
+
+ if (endRow.equals("null")) {
+ endRow = null;
+ }
+
+ printEvents(conn, tableId, endRow, time);
+ }
+
+ /**
+ * @param conn
+ * @param tablePrefix
+ * @param tableId
+ * @param endRow
+ * @param time
+ */
+ private static void printEvents(Connector conn, String tableId, String endRow, Long time) throws Exception {
+ Scanner scanner = conn.createScanner("tabletEvents", new Authorizations());
+ String metaRow = tableId + (endRow == null ? "<" : ";" + endRow);
+ scanner.setRange(new Range(new Key(metaRow, String.format("%020d", time)), true, new Key(metaRow).followingKey(PartialKey.ROW), false));
+ int count = 0;
+
+ String lastLog = null;
+
+ loop1: for (Entry<Key,Value> entry : scanner) {
+ if (entry.getKey().getColumnQualifier().toString().equals("log")) {
+ if (lastLog == null || !lastLog.equals(entry.getValue().toString()))
+ System.out.println("Log : " + entry.getValue());
+ lastLog = entry.getValue().toString();
+ } else if (entry.getKey().getColumnQualifier().toString().equals("mut")) {
+ DataInputStream dis = new DataInputStream(new ByteArrayInputStream(entry.getValue().get()));
+ Mutation m = new Mutation();
+ m.readFields(dis);
+
+ LogFileValue lfv = new LogFileValue();
+ lfv.mutations = new Mutation[] {m};
+
+ System.out.println(LogFileValue.format(lfv, 1));
+
+ List<ColumnUpdate> columnsUpdates = m.getUpdates();
+ for (ColumnUpdate cu : columnsUpdates) {
+ if (Constants.METADATA_PREV_ROW_COLUMN.equals(new Text(cu.getColumnFamily()), new Text(cu.getColumnQualifier())) && count > 0) {
+ System.out.println("Saw change to prevrow, stopping printing events.");
+ break loop1;
+ }
+ }
+ count++;
+ }
+ }
+
+ }
+}
Added: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/metanalysis/package-info.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/metanalysis/package-info.java?rev=1349503&view=auto
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/metanalysis/package-info.java (added)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/metanalysis/package-info.java Tue Jun 12 20:21:11 2012
@@ -0,0 +1,18 @@
+/**
+ * Provides programs to analyze metadata mutations written to write ahead logs.
+ *
+ * <p>
+ * These programs can be used when write ahead logs are archived. The best way to find
+ * which write ahead logs contain metadata mutations is to grep the tablet server logs.
+ * Grep for events where walogs were added to metadata tablets, then take the unique set
+ * of walogs.
+ *
+ * <p>
+ * To use these programs, use IndexMeta to index the metadata mutations in walogs into
+ * Accumulo tables. Then use FindTable and PrintEvents to analyze those indexes.
+ * FilterMetaiallows filtering walogs down to just metadata events. This is useful for the
+ * case where the walogs need to be exported from the cluster for analysis.
+ *
+ * @since 1.5
+ */
+package org.apache.accumulo.server.metanalysis;
\ No newline at end of file