You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by mm...@apache.org on 2019/09/19 22:46:55 UTC
[accumulo] branch 1.9 updated: Add colVis file to
ContinuousInputFormat. Fixes #1368 (#1369)
This is an automated email from the ASF dual-hosted git repository.
mmiller pushed a commit to branch 1.9
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/1.9 by this push:
new af05b27 Add colVis file to ContinuousInputFormat. Fixes #1368 (#1369)
af05b27 is described below
commit af05b27ae9e2a576907e7d1d96d8360f187434a7
Author: Mike Miller <mm...@apache.org>
AuthorDate: Thu Sep 19 18:46:49 2019 -0400
Add colVis file to ContinuousInputFormat. Fixes #1368 (#1369)
---
.../accumulo/test/continuous/ContinuousIngest.java | 25 +++++++++++++---------
.../test/continuous/ContinuousInputFormat.java | 17 +++++++++++++--
2 files changed, 30 insertions(+), 12 deletions(-)
diff --git a/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousIngest.java b/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousIngest.java
index f1fd4d9..17342bc 100644
--- a/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousIngest.java
+++ b/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousIngest.java
@@ -19,6 +19,7 @@ package org.apache.accumulo.test.continuous;
import static java.nio.charset.StandardCharsets.UTF_8;
import java.io.BufferedReader;
+import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.Collections;
@@ -57,19 +58,23 @@ public class ContinuousIngest {
return;
}
- visibilities = new ArrayList<>();
-
- FileSystem fs = FileSystem.get(new Configuration());
- BufferedReader in =
- new BufferedReader(new InputStreamReader(fs.open(new Path(opts.visFile)), UTF_8));
+ visibilities = readVisFromFile(opts.visFile);
+ }
- String line;
+ public static List<ColumnVisibility> readVisFromFile(String visFile) {
+ List<ColumnVisibility> vis = new ArrayList<>();
- while ((line = in.readLine()) != null) {
- visibilities.add(new ColumnVisibility(line));
+ try (BufferedReader in = new BufferedReader(new InputStreamReader(
+ FileSystem.get(new Configuration()).open(new Path(visFile)), UTF_8))) {
+ String line;
+ while ((line = in.readLine()) != null) {
+ vis.add(new ColumnVisibility(line));
+ }
+ } catch (IOException e) {
+ System.out.println("ERROR reading visFile " + visFile + ": ");
+ e.printStackTrace();
}
-
- in.close();
+ return vis;
}
private static ColumnVisibility getVisibility(Random rand) {
diff --git a/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousInputFormat.java b/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousInputFormat.java
index 6aed14a..f140b71 100644
--- a/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousInputFormat.java
+++ b/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousInputFormat.java
@@ -27,6 +27,7 @@ import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.security.SecureRandom;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.zip.CRC32;
@@ -34,6 +35,7 @@ import java.util.zip.Checksum;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.ColumnVisibility;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputFormat;
@@ -56,6 +58,7 @@ public class ContinuousInputFormat extends InputFormat<Key,Value> {
private static final String PROP_FAM_MAX = "mrbulk.fam.max";
private static final String PROP_QUAL_MAX = "mrbulk.qual.max";
private static final String PROP_CHECKSUM = "mrbulk.checksum";
+ private static final String PROP_VIS_FILE = "mrbulk.vis.file";
private static class RandomSplit extends InputSplit implements Writable {
@Override
@@ -93,6 +96,8 @@ public class ContinuousInputFormat extends InputFormat<Key,Value> {
conf.setLong(PROP_ROW_MAX, opts.max);
conf.setInt(PROP_FAM_MAX, opts.maxColF);
conf.setInt(PROP_QUAL_MAX, opts.maxColQ);
+ if (opts.visFile != null)
+ conf.set(PROP_VIS_FILE, opts.visFile);
conf.setBoolean(PROP_CHECKSUM, opts.checksum);
}
@@ -110,6 +115,7 @@ public class ContinuousInputFormat extends InputFormat<Key,Value> {
long maxRow;
int maxFam;
int maxQual;
+ List<ColumnVisibility> visibilities;
boolean checksum;
Key prevKey;
@@ -126,6 +132,12 @@ public class ContinuousInputFormat extends InputFormat<Key,Value> {
maxFam = job.getConfiguration().getInt(PROP_FAM_MAX, Short.MAX_VALUE);
maxQual = job.getConfiguration().getInt(PROP_QUAL_MAX, Short.MAX_VALUE);
checksum = job.getConfiguration().getBoolean(PROP_CHECKSUM, false);
+ String visFile = job.getConfiguration().get(PROP_VIS_FILE);
+ if (visFile == null) {
+ visibilities = Collections.singletonList(new ColumnVisibility());
+ } else {
+ visibilities = ContinuousIngest.readVisFromFile(visFile);
+ }
random = new Random(new SecureRandom().nextLong());
@@ -138,15 +150,16 @@ public class ContinuousInputFormat extends InputFormat<Key,Value> {
byte[] fam = genCol(random.nextInt(maxFam));
byte[] qual = genCol(random.nextInt(maxQual));
+ byte[] cv = visibilities.get(random.nextInt(visibilities.size())).flatten();
if (cksum != null) {
cksum.update(row);
cksum.update(fam);
cksum.update(qual);
- cksum.update(new byte[0]); // TODO col vis
+ cksum.update(cv);
}
- return new Key(row, fam, qual);
+ return new Key(row, fam, qual, cv);
}
private byte[] createValue(byte[] ingestInstanceId, byte[] prevRow, Checksum cksum) {