You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2021/10/08 17:01:25 UTC
[hbase] branch master updated: HBASE-26335 Minor improvements to
IntegrationTestLoadCommonCrawl (#3731)
This is an automated email from the ASF dual-hosted git repository.
apurtell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/master by this push:
new a384c23 HBASE-26335 Minor improvements to IntegrationTestLoadCommonCrawl (#3731)
a384c23 is described below
commit a384c239b9c5462c16ded4f74fc5dd7b3e1b1682
Author: Andrew Purtell <ap...@apache.org>
AuthorDate: Fri Oct 8 10:00:51 2021 -0700
HBASE-26335 Minor improvements to IntegrationTestLoadCommonCrawl (#3731)
HBASE-26335 Minor improvements to IntegrationTestLoadCommonCrawl
- Use BufferedMutator instead of Table.
- Improve row key generator.
- Improve retries and log levels.
Signed-off-by: Viraj Jasani <vj...@apache.org>
---
.../hbase/test/IntegrationTestLoadCommonCrawl.java | 162 ++++++++++++---------
1 file changed, 94 insertions(+), 68 deletions(-)
diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestLoadCommonCrawl.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestLoadCommonCrawl.java
index 5dcd84e..40bad99 100644
--- a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestLoadCommonCrawl.java
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestLoadCommonCrawl.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.hbase.IntegrationTestingUtility;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.BufferedMutator;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Connection;
@@ -156,6 +157,8 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase {
protected static byte[] RECORD_ID_QUALIFIER = Bytes.toBytes("r");
protected static byte[] TARGET_URI_QUALIFIER = Bytes.toBytes("u");
+ private static final int VERIFICATION_READ_RETRIES = 10;
+
public static enum Counts {
REFERENCED, UNREFERENCED, CORRUPT
}
@@ -516,8 +519,6 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase {
job.setOutputValueClass(BytesWritable.class);
TableMapReduceUtil.addDependencyJars(job);
- LOG.info("Submitting job." +
- " This will take time proportional to the number of input files, please be patient.");
boolean success = job.waitForCompletion(true);
if (!success) {
LOG.error("Failure during job " + job.getJobID());
@@ -551,18 +552,20 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase {
Configuration conf;
Connection conn;
- Table table;
+ BufferedMutator mutator;
@Override
protected void setup(Context context) throws IOException, InterruptedException {
- conn = ConnectionFactory.createConnection(context.getConfiguration());
- table = conn.getTable(getTablename(conn.getConfiguration()));
+ conf = context.getConfiguration();
+ conn = ConnectionFactory.createConnection(conf);
+ mutator = conn.getBufferedMutator(getTablename(conf));
+ mutator.setWriteBufferPeriodicFlush(10 * 1000); // default is 1 sec, increase to 10
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
try {
- table.close();
+ mutator.close();
} catch (Exception e) {
LOG.warn("Exception closing Table", e);
}
@@ -582,7 +585,7 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase {
if (warcHeader.getRecordType().equals("response") && targetURI != null) {
String contentType = warcHeader.getField("WARC-Identified-Payload-Type");
if (contentType != null) {
- LOG.debug("Processing record id=" + recordID + ", targetURI=\"" + targetURI + "\"");
+ LOG.info("Processing uri=\"" + targetURI + "\", id=" + recordID);
long now = EnvironmentEdgeManager.currentTime();
// Make row key
@@ -623,7 +626,7 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase {
if (ipAddr != null) {
put.addColumn(INFO_FAMILY_NAME, IP_ADDRESS_QUALIFIER, now, Bytes.toBytes(ipAddr));
}
- table.put(put);
+ mutator.mutate(put);
// Write records out for later verification, one per HBase field except for the
// content record, which will be verified by CRC64.
@@ -651,10 +654,11 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase {
private byte[] rowKeyFromTargetURI(String targetUri)
throws URISyntaxException, IllegalArgumentException {
URI uri = new URI(targetUri);
- StringBuffer sb = new StringBuffer();
// Ignore the scheme
// Reverse the components of the hostname
+ String reversedHost;
if (uri.getHost() != null) {
+ StringBuffer sb = new StringBuffer();
String[] hostComponents = uri.getHost().split("\\.");
for (int i = hostComponents.length - 1; i >= 0; i--) {
sb.append(hostComponents[i]);
@@ -662,28 +666,31 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase {
sb.append('.');
}
}
+ reversedHost = sb.toString();
} else {
throw new IllegalArgumentException("URI is missing host component");
}
- // Port
- if (uri.getPort() != -1) {
+ StringBuffer sb = new StringBuffer();
+ sb.append(reversedHost);
+ if (uri.getPort() >= 0) {
sb.append(':');
sb.append(uri.getPort());
}
- if (uri.getRawPath() != null) {
- sb.append(uri.getRawPath());
+ if (uri.getPath() != null) {
+ sb.append('/');
+ sb.append(uri.getPath());
}
- if (uri.getRawQuery() != null) {
+ if (uri.getQuery() != null) {
sb.append('?');
- sb.append(uri.getRawQuery());
+ sb.append(uri.getQuery());
}
- if (uri.getRawFragment() != null) {
+ if (uri.getFragment() != null) {
sb.append('#');
- sb.append(uri.getRawFragment());
+ sb.append(uri.getFragment());
}
- // Constrain the key size to the maximum allowed row key length
- if (sb.length() > HConstants.MAX_ROW_LENGTH) {
- sb.setLength(HConstants.MAX_ROW_LENGTH);
+ if (sb.length() > HConstants.MAX_ROW_LENGTH) {
+ throw new IllegalArgumentException("Key would be too large (length=" + sb.length() +
+ ", limit=" + HConstants.MAX_ROW_LENGTH);
}
return Bytes.toBytes(sb.toString());
}
@@ -767,73 +774,92 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase {
byte[] qualifier = Bytes.copy(key.getQualifierArray(), key.getQualifierOffset(),
key.getQualifierLength());
long ts = key.getTimestamp();
+ int retries = VERIFICATION_READ_RETRIES;
- if (Bytes.equals(INFO_FAMILY_NAME, family) &&
- Bytes.equals(CRC_QUALIFIER, qualifier)) {
+ while (true) {
- long expectedCRC64 = Bytes.toLong(value.getBytes(), 0, value.getLength());
+ if (Bytes.equals(INFO_FAMILY_NAME, family) &&
+ Bytes.equals(CRC_QUALIFIER, qualifier)) {
- Result result =
- table.get(new Get(row)
+ long expectedCRC64 = Bytes.toLong(value.getBytes(), 0, value.getLength());
+ Result result = table.get(new Get(row)
.addColumn(CONTENT_FAMILY_NAME, CONTENT_QUALIFIER)
.addColumn(INFO_FAMILY_NAME, CRC_QUALIFIER)
.setTimestamp(ts));
-
- byte[] content = result.getValue(CONTENT_FAMILY_NAME, CONTENT_QUALIFIER);
- if (content == null) {
- LOG.info("Row " + Bytes.toStringBinary(row) + ": missing content");
- output.getCounter(Counts.UNREFERENCED).increment(1);
- return;
- } else {
- CRC64 crc = new CRC64();
- crc.update(content);
- if (crc.getValue() != expectedCRC64) {
- LOG.info("Row " + Bytes.toStringBinary(row) + ": corrupt content");
+ byte[] content = result.getValue(CONTENT_FAMILY_NAME, CONTENT_QUALIFIER);
+ if (content == null) {
+ if (retries-- > 0) {
+ continue;
+ }
+ LOG.error("Row " + Bytes.toStringBinary(row) + ": missing content");
+ output.getCounter(Counts.UNREFERENCED).increment(1);
+ return;
+ } else {
+ CRC64 crc = new CRC64();
+ crc.update(content);
+ if (crc.getValue() != expectedCRC64) {
+ if (retries-- > 0) {
+ continue;
+ }
+ LOG.error("Row " + Bytes.toStringBinary(row) + ": corrupt content");
+ output.getCounter(Counts.CORRUPT).increment(1);
+ return;
+ }
+ }
+ byte[] crc = result.getValue(INFO_FAMILY_NAME, CRC_QUALIFIER);
+ if (crc == null) {
+ if (retries-- > 0) {
+ continue;
+ }
+ LOG.error("Row " + Bytes.toStringBinary(row) + ": missing i:c");
+ output.getCounter(Counts.UNREFERENCED).increment(1);
+ return;
+ }
+ if (Bytes.toLong(crc) != expectedCRC64) {
+ if (retries-- > 0) {
+ continue;
+ }
+ LOG.error("Row " + Bytes.toStringBinary(row) + ": i:c mismatch");
output.getCounter(Counts.CORRUPT).increment(1);
return;
}
- }
- byte[] crc = result.getValue(INFO_FAMILY_NAME, CRC_QUALIFIER);
- if (crc == null) {
- LOG.info("Row " + Bytes.toStringBinary(row) + ": missing i:c");
- output.getCounter(Counts.UNREFERENCED).increment(1);
- return;
- }
- if (Bytes.toLong(crc) != expectedCRC64) {
- LOG.info("Row " + Bytes.toStringBinary(row) + ": i:c mismatch");
- output.getCounter(Counts.CORRUPT).increment(1);
- return;
- }
- } else {
+ } else {
- Result result =
- table.get(new Get(row)
+ Result result = table.get(new Get(row)
.addColumn(family, qualifier)
.setTimestamp(ts));
+ byte[] bytes = result.getValue(family, qualifier);
+ if (bytes == null) {
+ if (retries-- > 0) {
+ continue;
+ }
+ LOG.error("Row " + Bytes.toStringBinary(row) + ": missing " +
+ Bytes.toStringBinary(family) + ":" + Bytes.toStringBinary(qualifier));
+ output.getCounter(Counts.UNREFERENCED).increment(1);
+ return;
+ }
+ if (!Bytes.equals(bytes, 0, bytes.length, value.getBytes(), 0, value.getLength())) {
+ if (retries-- > 0) {
+ continue;
+ }
+ LOG.error("Row " + Bytes.toStringBinary(row) + ": " +
+ Bytes.toStringBinary(family) + ":" + Bytes.toStringBinary(qualifier) +
+ " mismatch");
+ output.getCounter(Counts.CORRUPT).increment(1);
+ return;
+ }
- byte[] bytes = result.getValue(family, qualifier);
- if (bytes == null) {
- LOG.info("Row " + Bytes.toStringBinary(row) + ": missing " +
- Bytes.toStringBinary(family) + ":" + Bytes.toStringBinary(qualifier));
- output.getCounter(Counts.UNREFERENCED).increment(1);
- return;
- }
- if (!Bytes.equals(bytes, 0, bytes.length, value.getBytes(), 0, value.getLength())) {
- LOG.info("Row " + Bytes.toStringBinary(row) + ": " +
- Bytes.toStringBinary(family) + ":" + Bytes.toStringBinary(qualifier) +
- " mismatch");
- output.getCounter(Counts.CORRUPT).increment(1);
- return;
}
- }
+ // If we fell through to here all verification checks have succeeded, potentially after
+ // retries, and we must exit the while loop.
+ output.getCounter(Counts.REFERENCED).increment(1);
+ break;
- output.getCounter(Counts.REFERENCED).increment(1);
+ }
}
-
}
-
}
}