You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-issues@hadoop.apache.org by "rulinma (JIRA)" <ji...@apache.org> on 2013/07/31 04:27:48 UTC

[jira] [Commented] (MAPREDUCE-5433) use mapreduce to parse hfiles and output keyvalue

    [ https://issues.apache.org/jira/browse/MAPREDUCE-5433?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13724769#comment-13724769 ] 

rulinma commented on MAPREDUCE-5433:
------------------------------------

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

public class HFileInputFormat extends
		FileInputFormat<ImmutableBytesWritable, KeyValue> {

	private class HFileRecordReader extends
			RecordReader<ImmutableBytesWritable, KeyValue> {
		private HFile.Reader reader;
		private final HFileScanner scanner;
		private int entryNumber = 0;

		public HFileRecordReader(FileSplit split, Configuration conf)
				throws IOException {
			final Path path = split.getPath();
			reader = HFile.createReader(FileSystem.get(conf), path,
					new CacheConfig(conf));
			scanner = reader.getScanner(false, false, false);
			scanner.seekTo();
		}

		@Override
		public void close() throws IOException {
			if (reader != null) {
				reader.close();
			}
		}

		@Override
		public ImmutableBytesWritable getCurrentKey() throws IOException,
				InterruptedException {
			System.out.println("key: " + scanner.getKeyValue().getRow());
			return new ImmutableBytesWritable(scanner.getKeyValue().getRow());
		}

		@Override
		public KeyValue getCurrentValue() throws IOException,
				InterruptedException {
			System.out.println("value: " + scanner.getKeyValue());
			return scanner.getKeyValue();
		}

		@Override
		public boolean nextKeyValue() throws IOException, InterruptedException {
			if (entryNumber == 0) {
				entryNumber++;
				return true;
			}
			entryNumber++;
			return scanner.next();
		}

		@Override
		public float getProgress() throws IOException, InterruptedException {
			if (reader != null) {
				return (entryNumber / reader.getEntries());
			}
			return 1;
		}

		@Override
		public void initialize(InputSplit arg0, TaskAttemptContext arg1)
				throws IOException, InterruptedException {
			System.out.println("init");
		}

	}

	@Override
	protected boolean isSplitable(JobContext context, Path filename) {
		return false;
	}

	@Override
	public RecordReader<ImmutableBytesWritable, KeyValue> createRecordReader(
			InputSplit split, TaskAttemptContext context) throws IOException,
			InterruptedException {
		return new HFileRecordReader((FileSplit) split,
				context.getConfiguration());
	}

}
                
> use mapreduce to parse hfiles and output keyvalue
> -------------------------------------------------
>
>                 Key: MAPREDUCE-5433
>                 URL: https://issues.apache.org/jira/browse/MAPREDUCE-5433
>             Project: Hadoop Map/Reduce
>          Issue Type: Improvement
>          Components: examples
>            Reporter: rulinma
>            Assignee: rulinma
>


--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira