You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by li...@apache.org on 2013/08/01 20:18:21 UTC

svn commit: r1509361 - in /hbase/branches/0.89-fb/src: main/java/org/apache/hadoop/hbase/mapreduce/ test/java/org/apache/hadoop/hbase/mapreduce/

Author: liyin
Date: Thu Aug  1 18:18:20 2013
New Revision: 1509361

URL: http://svn.apache.org/r1509361
Log:
[master]support incremental hfile in task subdir

Author: fan

Summary: support incremental hfile in task subdir

Test Plan: a unit test for creating subdir path

Reviewers: adela, liyintang, manukranthk, aaiyer

Reviewed By: liyintang

CC: hbase-eng@, vinodv

Differential Revision: https://phabricator.fb.com/D896850

Task ID: 2633302

Modified:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java?rev=1509361&r1=1509360&r2=1509361&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java Thu Aug  1 18:18:20 2013
@@ -21,15 +21,12 @@ package org.apache.hadoop.hbase.mapreduc
 
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
-import java.net.InetSocketAddress;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.net.URLDecoder;
 import java.net.URLEncoder;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
-import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
@@ -58,13 +55,12 @@ import org.apache.hadoop.hbase.io.hfile.
 import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl;
 import org.apache.hadoop.hbase.mapreduce.hadoopbackport.TotalOrderPartitioner;
 import org.apache.hadoop.hbase.master.RegionPlacement;
-import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType;
 import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType;
 import org.apache.hadoop.hbase.util.BloomFilterFactory;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.RawComparator;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.RecordWriter;
@@ -93,6 +89,8 @@ public class HFileOutputFormat extends F
   static final String TABLE_NAME = "hbase.hfileoutputformat.tablename";
   static final String UTF8 = "UTF-8";
 
+  protected static RecordWriter<ImmutableBytesWritable, KeyValue> latestWriter = null;
+
   public RecordWriter<ImmutableBytesWritable, KeyValue> getRecordWriter(final TaskAttemptContext context)
   throws IOException, InterruptedException {
     // Get the path of the temporary output file
@@ -100,6 +98,7 @@ public class HFileOutputFormat extends F
     final Path outputdir = new FileOutputCommitter(outputPath, context).getWorkPath();
     final Configuration conf = context.getConfiguration();
     final FileSystem fs = outputdir.getFileSystem(conf);
+
     // These configs. are from hbase-*.xml
     final long maxsize = conf.getLong("hbase.hregion.max.filesize",
         HConstants.DEFAULT_MAX_FILE_SIZE);
@@ -125,7 +124,8 @@ public class HFileOutputFormat extends F
     final Pair<byte[][], byte[][]> startKeysAndFavoredNodes = 
       (table == null ? null : table.getStartKeysAndFavoredNodes());
 
-    return new RecordWriter<ImmutableBytesWritable, KeyValue>() {
+    RecordWriter<ImmutableBytesWritable, KeyValue> writer =
+        new RecordWriter<ImmutableBytesWritable, KeyValue>() {
       // Map of families to writers and how much has been output on the writer.
       private final Map<byte [], WriterLength> writers =
         new TreeMap<byte [], WriterLength>(Bytes.BYTES_COMPARATOR);
@@ -266,6 +266,7 @@ public class HFileOutputFormat extends F
            */
           w.appendMetadata(HConstants.NO_MIN_FLUSH_TIME, 0, false);
           w.close();
+
         }
       }
 
@@ -276,6 +277,9 @@ public class HFileOutputFormat extends F
         }
       }
     };
+
+    latestWriter = writer;
+    return writer;
   }
 
   /*

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java?rev=1509361&r1=1509360&r2=1509361&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java Thu Aug  1 18:18:20 2013
@@ -74,10 +74,13 @@ public class LoadIncrementalHFiles exten
   public static String EXIT_ON_FIRST_FAILURE = "hbase.mapreduce.bulkload.failure.exitOnFirst";
   private boolean exitOnFirstFailure;
 
+  private Configuration conf;
+
   public LoadIncrementalHFiles(Configuration conf) {
     super(conf);
     assignSeqIds = conf.getBoolean(ASSIGN_SEQ_IDS, true);
     exitOnFirstFailure = conf.getBoolean(EXIT_ON_FIRST_FAILURE, true);
+    this.conf = conf;
   }
 
   public LoadIncrementalHFiles() {

Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java?rev=1509361&r1=1509360&r2=1509361&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java Thu Aug  1 18:18:20 2013
@@ -26,16 +26,15 @@ import static org.junit.Assert.assertTru
 import static org.junit.Assert.fail;
 
 import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.URI;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Random;
 import java.util.Map.Entry;
+import java.util.Random;
 
+import com.google.common.collect.Lists;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -46,13 +45,12 @@ import org.apache.hadoop.hbase.HBaseConf
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HServerAddress;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
 import org.apache.hadoop.hbase.PerformanceEvaluation;
 import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Put;
@@ -65,18 +63,14 @@ import org.apache.hadoop.hbase.io.Immuta
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.io.hfile.Compression;
-import org.apache.hadoop.hbase.io.hfile.HFile;
-import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
-import org.apache.hadoop.hbase.io.hfile.HFileScanner;
 import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm;
+import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
-import org.apache.hadoop.hbase.master.AssignmentPlan;
+import org.apache.hadoop.hbase.io.hfile.HFileScanner;
 import org.apache.hadoop.hbase.master.RegionPlacement;
-import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.StoreFile;
 import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
@@ -92,8 +86,6 @@ import org.apache.hadoop.mapreduce.lib.o
 import org.junit.Test;
 import org.mockito.Mockito;
 
-import com.google.common.collect.Lists;
-
 /**
  * Simple test for {@link KeyValueSortReducer} and {@link HFileOutputFormat}.
  * Sets up and runs a mapreduce job that writes hfile output.
@@ -253,6 +245,50 @@ public class TestHFileOutputFormat  {
     }
   }
 
+  static class SimpleKVMapper
+      extends Mapper<NullWritable, NullWritable, ImmutableBytesWritable, KeyValue> {
+
+    static final String testKey = "testKey";
+    static final String testValue = "testValue";
+
+    @Override
+    protected void map (
+        NullWritable n1, NullWritable n2,
+        Mapper<NullWritable, NullWritable,
+            ImmutableBytesWritable, KeyValue>.Context context)
+        throws IOException ,InterruptedException
+    {
+      int taskId = context.getTaskAttemptID().getTaskID().getId();
+      assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!";
+
+      String taskIdString = context.getTaskAttemptID().getTaskID().toString();
+      byte[] keyBytes = Bytes.toBytes(testKey + taskIdString);
+      byte[] valBytes = Bytes.toBytes(testValue);
+      ImmutableBytesWritable key = new ImmutableBytesWritable(keyBytes);
+
+      for (byte[] family : TestHFileOutputFormat.FAMILIES) {
+        KeyValue kv = new KeyValue(keyBytes, family,
+            PerformanceEvaluation.QUALIFIER_NAME, valBytes);
+        context.write(key, kv);
+      }
+    }
+
+    @Override
+    protected void cleanup(Context context) throws IOException, InterruptedException {
+      Configuration conf = context.getConfiguration();
+      HTable table = new HTable(conf, TABLE_NAME);
+
+      Path outputPath = FileOutputFormat.getOutputPath(context);
+      FileSystem fs = outputPath.getFileSystem(conf);
+      Path workOutputPath = FileOutputFormat.getWorkOutputPath(context).makeQualified(fs);
+
+      // Force flushing HFile into working directory
+      HFileOutputFormat.latestWriter.close(context);
+
+      new LoadIncrementalHFiles(conf).doBulkLoad(workOutputPath, table);
+    }
+  };
+
   /**
    * Test for the union style MR jobs that runs both Put and Delete requests
    * @throws Exception on job, sorting, IO or fs errors
@@ -621,6 +657,55 @@ public class TestHFileOutputFormat  {
   }
 
   /**
+   * Test for uploading map output in cleanup stage of each task
+   * @throws Exception
+   */
+  @Test
+  public void testUploadByTask() throws Exception {
+    try {
+      MiniHBaseCluster cluster = util.startMiniCluster();
+      cluster.getMaster();
+      HTable table = util.createTable(TABLE_NAME, FAMILIES);
+      Configuration conf = table.getConfiguration() ;
+      Path testDir = util.getTestDir("testUploadByTask");
+
+      // Generate the bulk load files
+      util.startMiniMapReduceCluster();
+
+      Job job = new Job(conf, "testUploadByTask");
+      job.setInputFormatClass(NMapInputFormat.class);
+      job.setMapperClass(SimpleKVMapper.class);
+      job.setMapOutputKeyClass(ImmutableBytesWritable.class);
+      job.setMapOutputValueClass(KeyValue.class);
+      job.setOutputFormatClass(HFileOutputFormat.class);
+      job.setNumReduceTasks(0);
+      HFileOutputFormat.configAsMapOutputFormat(job, table);
+      FileOutputFormat.setOutputPath(job, testDir);
+      assertTrue(job.waitForCompletion(true));
+
+      // Ensure data shows up
+      int expectedRows = conf.getInt("mapred.map.tasks", 1);
+      assertEquals("LoadIncrementalHFiles should put a row per task",
+          expectedRows, util.countRows(table));
+      Scan scan = new Scan();
+      ResultScanner results = table.getScanner(scan);
+      for (Result res : results) {
+        assertEquals(FAMILIES.length, res.raw().length);
+        for (KeyValue kv : res.raw()) {
+          assertTrue("Key should start with pre-defined test key",
+              Bytes.toStringBinary(kv.getRow()).startsWith(SimpleKVMapper.testKey));
+          assertEquals("Value should equal to pre-defined value",
+              SimpleKVMapper.testValue, Bytes.toString(kv.getValue()));
+        }
+      }
+      results.close();
+    } finally {
+      util.shutdownMiniMapReduceCluster();
+      util.shutdownMiniCluster();
+    }
+  }
+
+  /**
    * Test for
    * {@link HFileOutputFormat#createFamilyCompressionMap(Configuration)}. Tests
    * that the compression map is correctly deserialized from configuration