You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2021/10/19 20:46:34 UTC

[hbase] branch master updated: HBASE-26349 Improve recent change to IntegrationTestLoadCommonCrawl (#3744)

This is an automated email from the ASF dual-hosted git repository.

apurtell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/master by this push:
     new 9e73ea8  HBASE-26349 Improve recent change to IntegrationTestLoadCommonCrawl (#3744)
9e73ea8 is described below

commit 9e73ea878d8c846c0ad9957206a1a403d75356b6
Author: Andrew Purtell <ap...@apache.org>
AuthorDate: Tue Oct 19 13:45:55 2021 -0700

    HBASE-26349 Improve recent change to IntegrationTestLoadCommonCrawl (#3744)
    
    Use a hybrid logical clock for timestamping entries.
    
    Using BufferedMutator without HLC was not good because we assign client timestamps,
    and the store loop is fast enough that on rare occasion two temporally adjacent URLs
    in the set of WARCs are equivalent and the timestamp does not advance, leading later
    to a rare false positive CORRUPT finding.
    
    While making changes, support direct S3N paths as input paths on the command line.
    
    Signed-off-by: Viraj Jasani <vj...@apache.org>
---
 .../hbase/test/IntegrationTestLoadCommonCrawl.java | 184 ++++++++++++---------
 1 file changed, 104 insertions(+), 80 deletions(-)

diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestLoadCommonCrawl.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestLoadCommonCrawl.java
index 40bad99..d750145 100644
--- a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestLoadCommonCrawl.java
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestLoadCommonCrawl.java
@@ -26,12 +26,12 @@ import java.io.InputStreamReader;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
 import java.util.HashSet;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.zip.GZIPInputStream;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.FSDataInputStream;
@@ -84,7 +84,6 @@ import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
 
 /**
@@ -163,17 +162,17 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase {
     REFERENCED, UNREFERENCED, CORRUPT
   }
 
-  Path warcFileInputDir = null;
-  Path outputDir = null;
-  String[] args;
+  protected Path warcFileInputDir = null;
+  protected Path outputDir = null;
+  protected String[] args;
 
-  protected int runLoader(Path warcFileInputDir, Path outputDir) throws Exception {
+  protected int runLoader(final Path warcFileInputDir, final Path outputDir) throws Exception {
     Loader loader = new Loader();
     loader.setConf(conf);
     return loader.run(warcFileInputDir, outputDir);
   }
 
-  protected int runVerify(Path inputDir) throws Exception {
+  protected int runVerify(final Path inputDir) throws Exception {
     Verify verify = new Verify();
     verify.setConf(conf);
     return verify.run(inputDir);
@@ -208,7 +207,7 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase {
   }
 
   @Override
-  protected void processOptions(CommandLine cmd) {
+  protected void processOptions(final CommandLine cmd) {
     processBaseOptions(cmd);
     args = cmd.getArgs();
   }
@@ -232,7 +231,7 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase {
     }
   }
 
-  static TableName getTablename(Configuration c) {
+  static TableName getTablename(final Configuration c) {
     return TableName.valueOf(c.get(TABLE_NAME_KEY, DEFAULT_TABLE_NAME));
   }
 
@@ -421,7 +420,7 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase {
     private static final Logger LOG = LoggerFactory.getLogger(Loader.class);
     private static final String USAGE = "Loader <warInputDir | warFileList> <outputDir>";
 
-    void createSchema(TableName tableName) throws IOException {
+    void createSchema(final TableName tableName) throws IOException {
 
       try (Connection conn = ConnectionFactory.createConnection(getConf());
            Admin admin = conn.getAdmin()) {
@@ -477,24 +476,24 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase {
       }
     }
 
-    int run(Path warcFileInput, Path outputDir)
+    int run(final Path warcFileInput, final Path outputDir)
         throws IOException, ClassNotFoundException, InterruptedException {
 
       createSchema(getTablename(getConf()));
 
-      Job job = Job.getInstance(getConf());
+      final Job job = Job.getInstance(getConf());
       job.setJobName(Loader.class.getName());
       job.setNumReduceTasks(0);
       job.setJarByClass(getClass());
       job.setMapperClass(LoaderMapper.class);
       job.setInputFormatClass(WARCInputFormat.class);
-      FileSystem fs = FileSystem.get(warcFileInput.toUri(), getConf());
+      final FileSystem fs = FileSystem.get(warcFileInput.toUri(), getConf());
       if (fs.getFileStatus(warcFileInput).isDirectory()) {
         LOG.info("Using directory as WARC input path: " + warcFileInput);
         FileInputFormat.setInputPaths(job, warcFileInput);
-      } else {
+      } else if (warcFileInput.toUri().getScheme().equals("file")) {
         LOG.info("Getting WARC input paths from file: " + warcFileInput);
-        List<Path> paths = new LinkedList<Path>();
+        final List<Path> paths = new ArrayList<Path>();
         try (FSDataInputStream is = fs.open(warcFileInput)) {
           InputStreamReader reader;
           if (warcFileInput.getName().toLowerCase().endsWith(".gz")) {
@@ -511,6 +510,8 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase {
         }
         LOG.info("Read " + paths.size() + " WARC input paths from " + warcFileInput);
         FileInputFormat.setInputPaths(job, paths.toArray(new Path[paths.size()]));
+      } else {
+        FileInputFormat.setInputPaths(job, warcFileInput);
       }
       job.setOutputFormatClass(SequenceFileOutputFormat.class);
       SequenceFileOutputFormat.setOutputPath(job, outputDir);
@@ -550,20 +551,19 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase {
     public static class LoaderMapper
         extends Mapper<LongWritable, WARCWritable, HBaseKeyWritable, BytesWritable> {
 
-      Configuration conf;
-      Connection conn;
-      BufferedMutator mutator;
+      protected Configuration conf;
+      protected Connection conn;
+      protected BufferedMutator mutator;
 
       @Override
-      protected void setup(Context context) throws IOException, InterruptedException {
+      protected void setup(final Context context) throws IOException, InterruptedException {
         conf = context.getConfiguration();
         conn = ConnectionFactory.createConnection(conf);
         mutator = conn.getBufferedMutator(getTablename(conf));
-        mutator.setWriteBufferPeriodicFlush(10 * 1000); // default is 1 sec, increase to 10
       }
 
       @Override
-      protected void cleanup(Context context) throws IOException, InterruptedException {
+      protected void cleanup(final Context context) throws IOException, InterruptedException {
         try {
           mutator.close();
         } catch (Exception e) {
@@ -577,16 +577,15 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase {
       }
 
       @Override
-      protected void map(LongWritable key, WARCWritable value, Context output)
+      protected void map(final LongWritable key, final WARCWritable value, final Context output)
           throws IOException, InterruptedException {
-        WARCRecord.Header warcHeader = value.getRecord().getHeader();
-        String recordID = warcHeader.getRecordID();
-        String targetURI = warcHeader.getTargetURI();
+        final WARCRecord.Header warcHeader = value.getRecord().getHeader();
+        final String recordID = warcHeader.getRecordID();
+        final String targetURI = warcHeader.getTargetURI();
         if (warcHeader.getRecordType().equals("response") && targetURI != null) {
-          String contentType = warcHeader.getField("WARC-Identified-Payload-Type");
+          final String contentType = warcHeader.getField("WARC-Identified-Payload-Type");
           if (contentType != null) {
             LOG.info("Processing uri=\"" + targetURI + "\", id=" + recordID);
-            long now = EnvironmentEdgeManager.currentTime();
 
             // Make row key
 
@@ -604,62 +603,63 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase {
 
             // Get the content and calculate the CRC64
 
-            byte[] content = value.getRecord().getContent();
-            CRC64 crc = new CRC64();
+            final byte[] content = value.getRecord().getContent();
+            final CRC64 crc = new CRC64();
             crc.update(content);
-            long crc64 = crc.getValue();
+            final long crc64 = crc.getValue();
 
             // Store to HBase
 
-            Put put = new Put(rowKey);
-            put.addColumn(CONTENT_FAMILY_NAME, CONTENT_QUALIFIER, now, content);
-            put.addColumn(INFO_FAMILY_NAME, CONTENT_LENGTH_QUALIFIER, now,
+            final long ts = getCurrentTime();
+            final Put put = new Put(rowKey);
+            put.addColumn(CONTENT_FAMILY_NAME, CONTENT_QUALIFIER, ts, content);
+            put.addColumn(INFO_FAMILY_NAME, CONTENT_LENGTH_QUALIFIER, ts,
               Bytes.toBytes(content.length));
-            put.addColumn(INFO_FAMILY_NAME, CONTENT_TYPE_QUALIFIER, now,
+            put.addColumn(INFO_FAMILY_NAME, CONTENT_TYPE_QUALIFIER, ts,
               Bytes.toBytes(contentType));
-            put.addColumn(INFO_FAMILY_NAME, CRC_QUALIFIER, now, Bytes.toBytes(crc64));
-            put.addColumn(INFO_FAMILY_NAME, RECORD_ID_QUALIFIER, now, Bytes.toBytes(recordID));
-            put.addColumn(INFO_FAMILY_NAME, TARGET_URI_QUALIFIER, now, Bytes.toBytes(targetURI));
-            put.addColumn(INFO_FAMILY_NAME, DATE_QUALIFIER, now,
+            put.addColumn(INFO_FAMILY_NAME, CRC_QUALIFIER, ts, Bytes.toBytes(crc64));
+            put.addColumn(INFO_FAMILY_NAME, RECORD_ID_QUALIFIER, ts, Bytes.toBytes(recordID));
+            put.addColumn(INFO_FAMILY_NAME, TARGET_URI_QUALIFIER, ts, Bytes.toBytes(targetURI));
+            put.addColumn(INFO_FAMILY_NAME, DATE_QUALIFIER, ts,
               Bytes.toBytes(warcHeader.getDateString()));
-            String ipAddr = warcHeader.getField("WARC-IP-Address");
+            final String ipAddr = warcHeader.getField("WARC-IP-Address");
             if (ipAddr != null) {
-              put.addColumn(INFO_FAMILY_NAME, IP_ADDRESS_QUALIFIER, now, Bytes.toBytes(ipAddr));
+              put.addColumn(INFO_FAMILY_NAME, IP_ADDRESS_QUALIFIER, ts, Bytes.toBytes(ipAddr));
             }
             mutator.mutate(put);
 
             // Write records out for later verification, one per HBase field except for the
             // content record, which will be verified by CRC64.
 
-            output.write(new HBaseKeyWritable(rowKey, INFO_FAMILY_NAME, CRC_QUALIFIER, now),
+            output.write(new HBaseKeyWritable(rowKey, INFO_FAMILY_NAME, CRC_QUALIFIER, ts),
               new BytesWritable(Bytes.toBytes(crc64)));
             output.write(new HBaseKeyWritable(rowKey, INFO_FAMILY_NAME, CONTENT_LENGTH_QUALIFIER,
-              now), new BytesWritable(Bytes.toBytes(content.length)));
+              ts), new BytesWritable(Bytes.toBytes(content.length)));
             output.write(new HBaseKeyWritable(rowKey, INFO_FAMILY_NAME, CONTENT_TYPE_QUALIFIER,
-              now), new BytesWritable(Bytes.toBytes(contentType)));
+              ts), new BytesWritable(Bytes.toBytes(contentType)));
             output.write(new HBaseKeyWritable(rowKey, INFO_FAMILY_NAME, RECORD_ID_QUALIFIER,
-              now), new BytesWritable(Bytes.toBytes(recordID)));
+              ts), new BytesWritable(Bytes.toBytes(recordID)));
             output.write(new HBaseKeyWritable(rowKey, INFO_FAMILY_NAME, TARGET_URI_QUALIFIER,
-              now), new BytesWritable(Bytes.toBytes(targetURI)));
-            output.write(new HBaseKeyWritable(rowKey, INFO_FAMILY_NAME, DATE_QUALIFIER, now),
+              ts), new BytesWritable(Bytes.toBytes(targetURI)));
+            output.write(new HBaseKeyWritable(rowKey, INFO_FAMILY_NAME, DATE_QUALIFIER, ts),
               new BytesWritable(Bytes.toBytes(warcHeader.getDateString())));
             if (ipAddr != null) {
               output.write(new HBaseKeyWritable(rowKey, INFO_FAMILY_NAME, IP_ADDRESS_QUALIFIER,
-                now), new BytesWritable(Bytes.toBytes(ipAddr)));
+                ts), new BytesWritable(Bytes.toBytes(ipAddr)));
             }
           }
         }
       }
 
-      private byte[] rowKeyFromTargetURI(String targetUri)
+      private byte[] rowKeyFromTargetURI(final String targetUri)
           throws URISyntaxException, IllegalArgumentException {
-        URI uri = new URI(targetUri);
+        final URI uri = new URI(targetUri);
         // Ignore the scheme
         // Reverse the components of the hostname
         String reversedHost;
         if (uri.getHost() != null) {
-          StringBuffer sb = new StringBuffer();
-          String[] hostComponents = uri.getHost().split("\\.");
+          final StringBuilder sb = new StringBuilder();
+          final String[] hostComponents = uri.getHost().split("\\.");
           for (int i = hostComponents.length - 1; i >= 0; i--) {
             sb.append(hostComponents[i]);
             if (i != 0) {
@@ -670,7 +670,7 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase {
         } else {
           throw new IllegalArgumentException("URI is missing host component");
         }
-        StringBuffer sb = new StringBuffer();
+        final StringBuilder sb = new StringBuilder();
         sb.append(reversedHost);
         if (uri.getPort() >= 0) {
           sb.append(':');
@@ -700,7 +700,7 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase {
 
   public static class OneFilePerMapperSFIF<K, V> extends SequenceFileInputFormat<K, V> {
     @Override
-    protected boolean isSplitable(JobContext context, Path filename) {
+    protected boolean isSplitable(final JobContext context, final Path filename) {
       return false;
     }
   }
@@ -710,7 +710,8 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase {
     public static final Logger LOG = LoggerFactory.getLogger(Verify.class);
     public static final String USAGE = "Verify <inputDir>";
 
-    int run(Path inputDir) throws IOException, ClassNotFoundException, InterruptedException {
+    int run(final Path inputDir)
+        throws IOException, ClassNotFoundException, InterruptedException {
       Job job = Job.getInstance(getConf());
       job.setJobName(Verify.class.getName());
       job.setJarByClass(getClass());
@@ -725,10 +726,18 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase {
       if (!success) {
         LOG.error("Failure during job " + job.getJobID());
       }
-      Counters counters = job.getCounters();
+      final Counters counters = job.getCounters();
       for (Counts c: Counts.values()) {
         LOG.info(c + ": " + counters.findCounter(c).getValue());
       }
+      if (counters.findCounter(Counts.UNREFERENCED).getValue() > 0) {
+        LOG.error("Nonzero UNREFERENCED count from job " + job.getJobID());
+        success = false;
+      }
+      if (counters.findCounter(Counts.CORRUPT).getValue() > 0) {
+        LOG.error("Nonzero CORRUPT count from job " + job.getJobID());
+        success = false;
+      }
       return success ? 0 : 1;
     }
 
@@ -749,44 +758,51 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase {
     public static class VerifyMapper
         extends Mapper<HBaseKeyWritable, BytesWritable, NullWritable, NullWritable> {
 
-      Connection conn;
-      Table table;
+      private Connection conn;
+      private Table table;
 
       @Override
-      protected void setup(Context context) throws IOException, InterruptedException {
+      protected void setup(final Context context) throws IOException, InterruptedException {
         conn = ConnectionFactory.createConnection(context.getConfiguration());
         table = conn.getTable(getTablename(conn.getConfiguration()));
       }
 
       @Override
-      protected void cleanup(Context context) throws IOException ,InterruptedException {
-        table.close();
-        conn.close();
+      protected void cleanup(final Context context) throws IOException, InterruptedException {
+        try {
+          table.close();
+        } catch (Exception e) {
+          LOG.warn("Exception closing Table", e);
+        }
+        try {
+          conn.close();
+        } catch (Exception e) {
+          LOG.warn("Exception closing Connection", e);
+        }
       }
 
       @Override
-      protected void map(HBaseKeyWritable key, BytesWritable value, Context output)
-          throws IOException, InterruptedException {
-
-        byte[] row = Bytes.copy(key.getRowArray(), key.getRowOffset(), key.getRowLength());
-        byte[] family = Bytes.copy(key.getFamilyArray(), key.getFamilyOffset(),
+      protected void map(final HBaseKeyWritable key, final BytesWritable value,
+          final Context output) throws IOException, InterruptedException {
+        final byte[] row = Bytes.copy(key.getRowArray(), key.getRowOffset(), key.getRowLength());
+        final byte[] family = Bytes.copy(key.getFamilyArray(), key.getFamilyOffset(),
           key.getFamilyLength());
-        byte[] qualifier = Bytes.copy(key.getQualifierArray(), key.getQualifierOffset(),
+        final byte[] qualifier = Bytes.copy(key.getQualifierArray(), key.getQualifierOffset(),
           key.getQualifierLength());
-        long ts = key.getTimestamp();
-        int retries = VERIFICATION_READ_RETRIES;
+        final long ts = key.getTimestamp();
 
+        int retries = VERIFICATION_READ_RETRIES;
         while (true) {
 
           if (Bytes.equals(INFO_FAMILY_NAME, family) &&
               Bytes.equals(CRC_QUALIFIER, qualifier)) {
 
-            long expectedCRC64 = Bytes.toLong(value.getBytes(), 0, value.getLength());
-            Result result = table.get(new Get(row)
+            final long expectedCRC64 = Bytes.toLong(value.getBytes(), 0, value.getLength());
+            final Result result = table.get(new Get(row)
               .addColumn(CONTENT_FAMILY_NAME, CONTENT_QUALIFIER)
               .addColumn(INFO_FAMILY_NAME, CRC_QUALIFIER)
               .setTimestamp(ts));
-            byte[] content = result.getValue(CONTENT_FAMILY_NAME, CONTENT_QUALIFIER);
+            final byte[] content = result.getValue(CONTENT_FAMILY_NAME, CONTENT_QUALIFIER);
             if (content == null) {
               if (retries-- > 0) {
                 continue;
@@ -795,18 +811,15 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase {
               output.getCounter(Counts.UNREFERENCED).increment(1);
               return;
             } else {
-              CRC64 crc = new CRC64();
+              final CRC64 crc = new CRC64();
               crc.update(content);
               if (crc.getValue() != expectedCRC64) {
-                if (retries-- > 0) {
-                  continue;
-                }
                 LOG.error("Row " + Bytes.toStringBinary(row) + ": corrupt content");
                 output.getCounter(Counts.CORRUPT).increment(1);
                 return;
               }
             }
-            byte[] crc = result.getValue(INFO_FAMILY_NAME, CRC_QUALIFIER);
+            final byte[] crc = result.getValue(INFO_FAMILY_NAME, CRC_QUALIFIER);
             if (crc == null) {
               if (retries-- > 0) {
                 continue;
@@ -826,10 +839,10 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase {
 
           } else {
 
-            Result result = table.get(new Get(row)
+            final Result result = table.get(new Get(row)
               .addColumn(family, qualifier)
               .setTimestamp(ts));
-            byte[] bytes = result.getValue(family, qualifier);
+            final byte[] bytes = result.getValue(family, qualifier);
             if (bytes == null) {
               if (retries-- > 0) {
                 continue;
@@ -862,4 +875,15 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase {
     }
   }
 
+  private static final AtomicLong counter = new AtomicLong();
+
+  private static long getCurrentTime() {
+    // Typical hybrid logical clock scheme.
+    // Take the current time, shift by 16 bits and zero those bits, and replace those bits
+    // with the low 16 bits of the atomic counter. Mask off the high bit too because timestamps
+    // cannot be negative.
+    return ((EnvironmentEdgeManager.currentTime() << 16) & 0x7fff_ffff_ffff_0000L) |
+        (counter.getAndIncrement() & 0xffffL);
+  }
+
 }