You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2015/06/27 01:00:39 UTC
[2/2] hbase git commit: Amend HBASE-13639 SyncTable - rsync for HBase
tables
Amend HBASE-13639 SyncTable - rsync for HBase tables
Fix compilation issues for Hadoop 1
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/df7ac747
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/df7ac747
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/df7ac747
Branch: refs/heads/0.98
Commit: df7ac74745ab881800d01d48a3a7f05c6a7992f4
Parents: cfb4827
Author: Andrew Purtell <ap...@apache.org>
Authored: Fri Jun 26 15:05:02 2015 -0700
Committer: Andrew Purtell <ap...@apache.org>
Committed: Fri Jun 26 15:39:35 2015 -0700
----------------------------------------------------------------------
.../lib/output/MapFileOutputFormat.java | 111 +++++++++++++++++++
.../hadoop/hbase/mapreduce/HashTable.java | 7 +-
.../hadoop/hbase/mapreduce/TestHashTable.java | 2 +-
3 files changed, 116 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/df7ac747/hbase-hadoop1-compat/src/main/java/org/apache/hadoop/mapreduce/lib/output/MapFileOutputFormat.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop1-compat/src/main/java/org/apache/hadoop/mapreduce/lib/output/MapFileOutputFormat.java b/hbase-hadoop1-compat/src/main/java/org/apache/hadoop/mapreduce/lib/output/MapFileOutputFormat.java
new file mode 100644
index 0000000..b8cb997
--- /dev/null
+++ b/hbase-hadoop1-compat/src/main/java/org/apache/hadoop/mapreduce/lib/output/MapFileOutputFormat.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.output;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileUtil;
+
+import org.apache.hadoop.io.MapFile;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * An {@link org.apache.hadoop.mapreduce.OutputFormat} that writes
+ * {@link MapFile}s.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public class MapFileOutputFormat
+ extends FileOutputFormat<WritableComparable<?>, Writable> {
+
+ public RecordWriter<WritableComparable<?>, Writable> getRecordWriter(
+ TaskAttemptContext context) throws IOException {
+ Configuration conf = context.getConfiguration();
+ CompressionCodec codec = null;
+ CompressionType compressionType = CompressionType.NONE;
+ if (getCompressOutput(context)) {
+ // find the kind of compression to do
+ compressionType = SequenceFileOutputFormat.getOutputCompressionType(context);
+
+ // find the right codec
+ Class<?> codecClass = getOutputCompressorClass(context,
+ DefaultCodec.class);
+ codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);
+ }
+
+ Path file = getDefaultWorkFile(context, "");
+ FileSystem fs = file.getFileSystem(conf);
+ // ignore the progress parameter, since MapFile is local
+ final MapFile.Writer out =
+ new MapFile.Writer(conf, fs, file.toString(),
+ context.getOutputKeyClass().asSubclass(WritableComparable.class),
+ context.getOutputValueClass().asSubclass(Writable.class),
+ compressionType, codec, context);
+
+ return new RecordWriter<WritableComparable<?>, Writable>() {
+ public void write(WritableComparable<?> key, Writable value)
+ throws IOException {
+ out.append(key, value);
+ }
+
+ public void close(TaskAttemptContext context) throws IOException {
+ out.close();
+ }
+ };
+ }
+
+ /** Open the output generated by this format. */
+ public static MapFile.Reader[] getReaders(Path dir,
+ Configuration conf) throws IOException {
+ FileSystem fs = dir.getFileSystem(conf);
+ Path[] names = FileUtil.stat2Paths(fs.listStatus(dir));
+
+ // sort names, so that hash partitioning works
+ Arrays.sort(names);
+
+ MapFile.Reader[] parts = new MapFile.Reader[names.length];
+ for (int i = 0; i < names.length; i++) {
+ parts[i] = new MapFile.Reader(fs, names[i].toString(), conf);
+ }
+ return parts;
+ }
+
+ /** Get an entry from output generated by this class. */
+ public static <K extends WritableComparable<?>, V extends Writable>
+ Writable getEntry(MapFile.Reader[] readers,
+ Partitioner<K, V> partitioner, K key, V value) throws IOException {
+ int part = partitioner.getPartition(key, value, readers.length);
+ return readers[part].get(key, value);
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/hbase/blob/df7ac747/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HashTable.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HashTable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HashTable.java
index eea1614..b2d7952 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HashTable.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HashTable.java
@@ -53,7 +53,6 @@ import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.MapFileOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
@@ -408,7 +407,8 @@ public class HashTable extends Configured implements Tool {
}
Path dataDir = new Path(TableHash.this.hashDir, HASH_DATA_DIR);
Path dataFile = new Path(dataDir, getDataFileName(hashFileIndex));
- mapFileReader = new MapFile.Reader(dataFile, conf);
+ mapFileReader = new MapFile.Reader(dataFile.getFileSystem(conf), dataFile.toString(),
+ conf);
}
@Override
@@ -446,7 +446,8 @@ public class HashTable extends Configured implements Tool {
job.setNumReduceTasks(tableHash.numHashFiles);
job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(ImmutableBytesWritable.class);
- job.setOutputFormatClass(MapFileOutputFormat.class);
+ jobConf.set("mapreduce.outputformat.class",
+ "org.apache.hadoop.mapreduce.lib.output.MapFileOutputFormat");
FileOutputFormat.setOutputPath(job, new Path(destPath, HASH_DATA_DIR));
return job;
http://git-wip-us.apache.org/repos/asf/hbase/blob/df7ac747/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHashTable.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHashTable.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHashTable.java
index 906a3c7..8d683cb 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHashTable.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHashTable.java
@@ -149,7 +149,7 @@ public class TestHashTable {
for (int i = 0; i < numHashFiles; i++) {
Path hashPath = new Path(dataDir, HashTable.TableHash.getDataFileName(i));
- MapFile.Reader reader = new MapFile.Reader(hashPath, fs.getConf());
+ MapFile.Reader reader = new MapFile.Reader(fs, hashPath.toString(), fs.getConf());
ImmutableBytesWritable key = new ImmutableBytesWritable();
ImmutableBytesWritable hash = new ImmutableBytesWritable();
while(reader.next(key, hash)) {