You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2015/06/27 01:00:39 UTC

[2/2] hbase git commit: Amend HBASE-13639 SyncTable - rsync for HBase tables

Amend HBASE-13639 SyncTable - rsync for HBase tables

Fix compilation issues for Hadoop 1


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/df7ac747
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/df7ac747
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/df7ac747

Branch: refs/heads/0.98
Commit: df7ac74745ab881800d01d48a3a7f05c6a7992f4
Parents: cfb4827
Author: Andrew Purtell <ap...@apache.org>
Authored: Fri Jun 26 15:05:02 2015 -0700
Committer: Andrew Purtell <ap...@apache.org>
Committed: Fri Jun 26 15:39:35 2015 -0700

----------------------------------------------------------------------
 .../lib/output/MapFileOutputFormat.java         | 111 +++++++++++++++++++
 .../hadoop/hbase/mapreduce/HashTable.java       |   7 +-
 .../hadoop/hbase/mapreduce/TestHashTable.java   |   2 +-
 3 files changed, 116 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/df7ac747/hbase-hadoop1-compat/src/main/java/org/apache/hadoop/mapreduce/lib/output/MapFileOutputFormat.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop1-compat/src/main/java/org/apache/hadoop/mapreduce/lib/output/MapFileOutputFormat.java b/hbase-hadoop1-compat/src/main/java/org/apache/hadoop/mapreduce/lib/output/MapFileOutputFormat.java
new file mode 100644
index 0000000..b8cb997
--- /dev/null
+++ b/hbase-hadoop1-compat/src/main/java/org/apache/hadoop/mapreduce/lib/output/MapFileOutputFormat.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.output;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileUtil;
+
+import org.apache.hadoop.io.MapFile;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+
+/** 
+ * An {@link org.apache.hadoop.mapreduce.OutputFormat} that writes 
+ * {@link MapFile}s.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public class MapFileOutputFormat 
+    extends FileOutputFormat<WritableComparable<?>, Writable> {
+
+  public RecordWriter<WritableComparable<?>, Writable> getRecordWriter(
+      TaskAttemptContext context) throws IOException {
+    Configuration conf = context.getConfiguration();
+    CompressionCodec codec = null;
+    CompressionType compressionType = CompressionType.NONE;
+    if (getCompressOutput(context)) {
+      // find the kind of compression to do
+      compressionType = SequenceFileOutputFormat.getOutputCompressionType(context);
+
+      // find the right codec
+      Class<?> codecClass = getOutputCompressorClass(context,
+	                          DefaultCodec.class);
+      codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);
+    }
+
+    Path file = getDefaultWorkFile(context, "");
+    FileSystem fs = file.getFileSystem(conf);
+    // ignore the progress parameter, since MapFile is local
+    final MapFile.Writer out =
+      new MapFile.Writer(conf, fs, file.toString(),
+        context.getOutputKeyClass().asSubclass(WritableComparable.class),
+        context.getOutputValueClass().asSubclass(Writable.class),
+        compressionType, codec, context);
+
+    return new RecordWriter<WritableComparable<?>, Writable>() {
+        public void write(WritableComparable<?> key, Writable value)
+            throws IOException {
+          out.append(key, value);
+        }
+
+        public void close(TaskAttemptContext context) throws IOException { 
+          out.close();
+        }
+      };
+  }
+
+  /** Open the output generated by this format. */
+  public static MapFile.Reader[] getReaders(Path dir,
+      Configuration conf) throws IOException {
+    FileSystem fs = dir.getFileSystem(conf);
+    Path[] names = FileUtil.stat2Paths(fs.listStatus(dir));
+
+    // sort names, so that hash partitioning works
+    Arrays.sort(names);
+    
+    MapFile.Reader[] parts = new MapFile.Reader[names.length];
+    for (int i = 0; i < names.length; i++) {
+      parts[i] = new MapFile.Reader(fs, names[i].toString(), conf);
+    }
+    return parts;
+  }
+    
+  /** Get an entry from output generated by this class. */
+  public static <K extends WritableComparable<?>, V extends Writable>
+      Writable getEntry(MapFile.Reader[] readers, 
+      Partitioner<K, V> partitioner, K key, V value) throws IOException {
+    int part = partitioner.getPartition(key, value, readers.length);
+    return readers[part].get(key, value);
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/hbase/blob/df7ac747/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HashTable.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HashTable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HashTable.java
index eea1614..b2d7952 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HashTable.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HashTable.java
@@ -53,7 +53,6 @@ import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.MapFileOutputFormat;
 import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
 import org.apache.hadoop.util.GenericOptionsParser;
 import org.apache.hadoop.util.Tool;
@@ -408,7 +407,8 @@ public class HashTable extends Configured implements Tool {
         }
         Path dataDir = new Path(TableHash.this.hashDir, HASH_DATA_DIR);
         Path dataFile = new Path(dataDir, getDataFileName(hashFileIndex));
-        mapFileReader = new MapFile.Reader(dataFile, conf);
+        mapFileReader = new MapFile.Reader(dataFile.getFileSystem(conf), dataFile.toString(),
+          conf);
       }
 
       @Override
@@ -446,7 +446,8 @@ public class HashTable extends Configured implements Tool {
     job.setNumReduceTasks(tableHash.numHashFiles);
     job.setOutputKeyClass(ImmutableBytesWritable.class);
     job.setOutputValueClass(ImmutableBytesWritable.class);
-    job.setOutputFormatClass(MapFileOutputFormat.class);
+    jobConf.set("mapreduce.outputformat.class",
+        "org.apache.hadoop.mapreduce.lib.output.MapFileOutputFormat");
     FileOutputFormat.setOutputPath(job, new Path(destPath, HASH_DATA_DIR));
     
     return job;

http://git-wip-us.apache.org/repos/asf/hbase/blob/df7ac747/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHashTable.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHashTable.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHashTable.java
index 906a3c7..8d683cb 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHashTable.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHashTable.java
@@ -149,7 +149,7 @@ public class TestHashTable {
     for (int i = 0; i < numHashFiles; i++) {
       Path hashPath = new Path(dataDir, HashTable.TableHash.getDataFileName(i));
       
-      MapFile.Reader reader = new MapFile.Reader(hashPath, fs.getConf());
+      MapFile.Reader reader = new MapFile.Reader(fs, hashPath.toString(), fs.getConf());
       ImmutableBytesWritable key = new ImmutableBytesWritable();
       ImmutableBytesWritable hash = new ImmutableBytesWritable();
       while(reader.next(key, hash)) {