You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2021/10/08 17:07:37 UTC

[hbase] branch branch-2 updated: HBASE-26335 Minor improvements to IntegrationTestLoadCommonCrawl (#3731)

This is an automated email from the ASF dual-hosted git repository.

apurtell pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new c3c7d36  HBASE-26335 Minor improvements to IntegrationTestLoadCommonCrawl (#3731)
c3c7d36 is described below

commit c3c7d36578ce6dcdc8b21a31117fdd894db6a608
Author: Andrew Purtell <ap...@apache.org>
AuthorDate: Fri Oct 8 10:00:51 2021 -0700

    HBASE-26335 Minor improvements to IntegrationTestLoadCommonCrawl (#3731)
    
    - Use BufferedMutator instead of Table.
    - Improve row key generator.
    - Improve retries and log levels.
    
    Signed-off-by: Viraj Jasani <vj...@apache.org>
---
 .../hbase/test/IntegrationTestLoadCommonCrawl.java | 162 ++++++++++++---------
 1 file changed, 94 insertions(+), 68 deletions(-)

diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestLoadCommonCrawl.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestLoadCommonCrawl.java
index 9c91796..d98cb8f 100644
--- a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestLoadCommonCrawl.java
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestLoadCommonCrawl.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.hbase.IntegrationTestingUtility;
 import org.apache.hadoop.hbase.MasterNotRunningException;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.BufferedMutator;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
 import org.apache.hadoop.hbase.client.Connection;
@@ -156,6 +157,8 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase {
   protected static byte[] RECORD_ID_QUALIFIER = Bytes.toBytes("r");
   protected static byte[] TARGET_URI_QUALIFIER = Bytes.toBytes("u");
 
+  private static final int VERIFICATION_READ_RETRIES = 10;
+
   public static enum Counts {
     REFERENCED, UNREFERENCED, CORRUPT
   }
@@ -516,8 +519,6 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase {
       job.setOutputValueClass(BytesWritable.class);
       TableMapReduceUtil.addDependencyJars(job);
 
-      LOG.info("Submitting job." +
-        " This will take time proportional to the number of input files, please be patient.");
       boolean success = job.waitForCompletion(true);
       if (!success) {
         LOG.error("Failure during job " + job.getJobID());
@@ -551,18 +552,20 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase {
 
       Configuration conf;
       Connection conn;
-      Table table;
+      BufferedMutator mutator;
 
       @Override
       protected void setup(Context context) throws IOException, InterruptedException {
-        conn = ConnectionFactory.createConnection(context.getConfiguration());
-        table = conn.getTable(getTablename(conn.getConfiguration()));
+        conf = context.getConfiguration();
+        conn = ConnectionFactory.createConnection(conf);
+        mutator = conn.getBufferedMutator(getTablename(conf));
+        mutator.setWriteBufferPeriodicFlush(10 * 1000); // default is 1 sec, increase to 10
       }
 
       @Override
       protected void cleanup(Context context) throws IOException, InterruptedException {
         try {
-          table.close();
+          mutator.close();
         } catch (Exception e) {
           LOG.warn("Exception closing Table", e);
         }
@@ -582,7 +585,7 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase {
         if (warcHeader.getRecordType().equals("response") && targetURI != null) {
           String contentType = warcHeader.getField("WARC-Identified-Payload-Type");
           if (contentType != null) {
-            LOG.debug("Processing record id=" + recordID + ", targetURI=\"" + targetURI + "\"");
+            LOG.info("Processing uri=\"" + targetURI + "\", id=" + recordID);
             long now = EnvironmentEdgeManager.currentTime();
 
             // Make row key
@@ -623,7 +626,7 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase {
             if (ipAddr != null) {
               put.addColumn(INFO_FAMILY_NAME, IP_ADDRESS_QUALIFIER, now, Bytes.toBytes(ipAddr));
             }
-            table.put(put);
+            mutator.mutate(put);
 
             // Write records out for later verification, one per HBase field except for the
             // content record, which will be verified by CRC64.
@@ -651,10 +654,11 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase {
       private byte[] rowKeyFromTargetURI(String targetUri)
           throws URISyntaxException, IllegalArgumentException {
         URI uri = new URI(targetUri);
-        StringBuffer sb = new StringBuffer();
         // Ignore the scheme
         // Reverse the components of the hostname
+        String reversedHost;
         if (uri.getHost() != null) {
+          StringBuffer sb = new StringBuffer();
           String[] hostComponents = uri.getHost().split("\\.");
           for (int i = hostComponents.length - 1; i >= 0; i--) {
             sb.append(hostComponents[i]);
@@ -662,28 +666,31 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase {
               sb.append('.');
             }
           }
+          reversedHost = sb.toString();
         } else {
           throw new IllegalArgumentException("URI is missing host component");
         }
-        // Port
-        if (uri.getPort() != -1) {
+        StringBuffer sb = new StringBuffer();
+        sb.append(reversedHost);
+        if (uri.getPort() >= 0) {
           sb.append(':');
           sb.append(uri.getPort());
         }
-        if (uri.getRawPath() != null) {
-          sb.append(uri.getRawPath());
+        if (uri.getPath() != null) {
+          sb.append('/');
+          sb.append(uri.getPath());
         }
-        if (uri.getRawQuery() != null) {
+        if (uri.getQuery() != null) {
           sb.append('?');
-          sb.append(uri.getRawQuery());
+          sb.append(uri.getQuery());
         }
-        if (uri.getRawFragment() != null) {
+        if (uri.getFragment() != null) {
           sb.append('#');
-          sb.append(uri.getRawFragment());
+          sb.append(uri.getFragment());
         }
-        // Constrain the key size to the maximum allowed row key length
-        if (sb.length() >  HConstants.MAX_ROW_LENGTH) {
-          sb.setLength(HConstants.MAX_ROW_LENGTH);
+        if (sb.length() > HConstants.MAX_ROW_LENGTH) {
+          throw new IllegalArgumentException("Key would be too large (length=" + sb.length() +
+            ", limit=" + HConstants.MAX_ROW_LENGTH);
         }
         return Bytes.toBytes(sb.toString());
       }
@@ -767,73 +774,92 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase {
         byte[] qualifier = Bytes.copy(key.getQualifierArray(), key.getQualifierOffset(),
           key.getQualifierLength());
         long ts = key.getTimestamp();
+        int retries = VERIFICATION_READ_RETRIES;
 
-        if (Bytes.equals(INFO_FAMILY_NAME, family) &&
-            Bytes.equals(CRC_QUALIFIER, qualifier)) {
+        while (true) {
 
-          long expectedCRC64 = Bytes.toLong(value.getBytes(), 0, value.getLength());
+          if (Bytes.equals(INFO_FAMILY_NAME, family) &&
+              Bytes.equals(CRC_QUALIFIER, qualifier)) {
 
-          Result result =
-            table.get(new Get(row)
+            long expectedCRC64 = Bytes.toLong(value.getBytes(), 0, value.getLength());
+            Result result = table.get(new Get(row)
               .addColumn(CONTENT_FAMILY_NAME, CONTENT_QUALIFIER)
               .addColumn(INFO_FAMILY_NAME, CRC_QUALIFIER)
               .setTimestamp(ts));
-
-          byte[] content = result.getValue(CONTENT_FAMILY_NAME, CONTENT_QUALIFIER);
-          if (content == null) {
-            LOG.info("Row " + Bytes.toStringBinary(row) + ": missing content");
-            output.getCounter(Counts.UNREFERENCED).increment(1);
-            return;
-          } else {
-            CRC64 crc = new CRC64();
-            crc.update(content);
-            if (crc.getValue() != expectedCRC64) {
-              LOG.info("Row " + Bytes.toStringBinary(row) + ": corrupt content");
+            byte[] content = result.getValue(CONTENT_FAMILY_NAME, CONTENT_QUALIFIER);
+            if (content == null) {
+              if (retries-- > 0) {
+                continue;
+              }
+              LOG.error("Row " + Bytes.toStringBinary(row) + ": missing content");
+              output.getCounter(Counts.UNREFERENCED).increment(1);
+              return;
+            } else {
+              CRC64 crc = new CRC64();
+              crc.update(content);
+              if (crc.getValue() != expectedCRC64) {
+                if (retries-- > 0) {
+                  continue;
+                }
+                LOG.error("Row " + Bytes.toStringBinary(row) + ": corrupt content");
+                output.getCounter(Counts.CORRUPT).increment(1);
+                return;
+              }
+            }
+            byte[] crc = result.getValue(INFO_FAMILY_NAME, CRC_QUALIFIER);
+            if (crc == null) {
+              if (retries-- > 0) {
+                continue;
+              }
+              LOG.error("Row " + Bytes.toStringBinary(row) + ": missing i:c");
+              output.getCounter(Counts.UNREFERENCED).increment(1);
+              return;
+            }
+            if (Bytes.toLong(crc) != expectedCRC64) {
+              if (retries-- > 0) {
+                continue;
+              }
+              LOG.error("Row " + Bytes.toStringBinary(row) + ": i:c mismatch");
               output.getCounter(Counts.CORRUPT).increment(1);
               return;
             }
-          }
-          byte[] crc = result.getValue(INFO_FAMILY_NAME, CRC_QUALIFIER);
-          if (crc == null) {
-            LOG.info("Row " + Bytes.toStringBinary(row) + ": missing i:c");
-            output.getCounter(Counts.UNREFERENCED).increment(1);
-            return;
-          }
-          if (Bytes.toLong(crc) != expectedCRC64) {
-            LOG.info("Row " + Bytes.toStringBinary(row) + ": i:c mismatch");
-            output.getCounter(Counts.CORRUPT).increment(1);
-            return;
-          }
 
-        } else {
+          } else {
 
-          Result result =
-            table.get(new Get(row)
+            Result result = table.get(new Get(row)
               .addColumn(family, qualifier)
               .setTimestamp(ts));
+            byte[] bytes = result.getValue(family, qualifier);
+            if (bytes == null) {
+              if (retries-- > 0) {
+                continue;
+              }
+              LOG.error("Row " + Bytes.toStringBinary(row) + ": missing " +
+                Bytes.toStringBinary(family) + ":" + Bytes.toStringBinary(qualifier));
+              output.getCounter(Counts.UNREFERENCED).increment(1);
+              return;
+            }
+            if (!Bytes.equals(bytes, 0, bytes.length, value.getBytes(), 0, value.getLength())) {
+              if (retries-- > 0) {
+                continue;
+              }
+              LOG.error("Row " + Bytes.toStringBinary(row) + ": " +
+                Bytes.toStringBinary(family) + ":" + Bytes.toStringBinary(qualifier) +
+                " mismatch");
+              output.getCounter(Counts.CORRUPT).increment(1);
+              return;
+            }
 
-          byte[] bytes = result.getValue(family, qualifier);
-          if (bytes == null) {
-            LOG.info("Row " + Bytes.toStringBinary(row) + ": missing " +
-              Bytes.toStringBinary(family) + ":" + Bytes.toStringBinary(qualifier));
-            output.getCounter(Counts.UNREFERENCED).increment(1);
-            return;
-          }
-          if (!Bytes.equals(bytes, 0, bytes.length, value.getBytes(), 0, value.getLength())) {
-            LOG.info("Row " + Bytes.toStringBinary(row) + ": " +
-              Bytes.toStringBinary(family) + ":" + Bytes.toStringBinary(qualifier) +
-              " mismatch");
-            output.getCounter(Counts.CORRUPT).increment(1);
-            return;
           }
 
-        }
+          // If we fell through to here all verification checks have succeeded, potentially after
+          // retries, and we must exit the while loop.
+          output.getCounter(Counts.REFERENCED).increment(1);
+          break;
 
-        output.getCounter(Counts.REFERENCED).increment(1);
+        }
       }
-
     }
-
   }
 
 }