You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by mm...@apache.org on 2019/09/19 22:46:55 UTC

[accumulo] branch 1.9 updated: Add colVis file to ContinuousInputFormat. Fixes #1368 (#1369)

This is an automated email from the ASF dual-hosted git repository.

mmiller pushed a commit to branch 1.9
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/1.9 by this push:
     new af05b27  Add colVis file to ContinuousInputFormat. Fixes #1368 (#1369)
af05b27 is described below

commit af05b27ae9e2a576907e7d1d96d8360f187434a7
Author: Mike Miller <mm...@apache.org>
AuthorDate: Thu Sep 19 18:46:49 2019 -0400

    Add colVis file to ContinuousInputFormat. Fixes #1368 (#1369)
---
 .../accumulo/test/continuous/ContinuousIngest.java | 25 +++++++++++++---------
 .../test/continuous/ContinuousInputFormat.java     | 17 +++++++++++++--
 2 files changed, 30 insertions(+), 12 deletions(-)

diff --git a/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousIngest.java b/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousIngest.java
index f1fd4d9..17342bc 100644
--- a/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousIngest.java
+++ b/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousIngest.java
@@ -19,6 +19,7 @@ package org.apache.accumulo.test.continuous;
 import static java.nio.charset.StandardCharsets.UTF_8;
 
 import java.io.BufferedReader;
+import java.io.IOException;
 import java.io.InputStreamReader;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -57,19 +58,23 @@ public class ContinuousIngest {
       return;
     }
 
-    visibilities = new ArrayList<>();
-
-    FileSystem fs = FileSystem.get(new Configuration());
-    BufferedReader in =
-        new BufferedReader(new InputStreamReader(fs.open(new Path(opts.visFile)), UTF_8));
+    visibilities = readVisFromFile(opts.visFile);
+  }
 
-    String line;
+  public static List<ColumnVisibility> readVisFromFile(String visFile) {
+    List<ColumnVisibility> vis = new ArrayList<>();
 
-    while ((line = in.readLine()) != null) {
-      visibilities.add(new ColumnVisibility(line));
+    try (BufferedReader in = new BufferedReader(new InputStreamReader(
+        FileSystem.get(new Configuration()).open(new Path(visFile)), UTF_8))) {
+      String line;
+      while ((line = in.readLine()) != null) {
+        vis.add(new ColumnVisibility(line));
+      }
+    } catch (IOException e) {
+      System.out.println("ERROR reading visFile " + visFile + ": ");
+      e.printStackTrace();
     }
-
-    in.close();
+    return vis;
   }
 
   private static ColumnVisibility getVisibility(Random rand) {
diff --git a/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousInputFormat.java b/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousInputFormat.java
index 6aed14a..f140b71 100644
--- a/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousInputFormat.java
+++ b/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousInputFormat.java
@@ -27,6 +27,7 @@ import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.security.SecureRandom;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Random;
 import java.util.zip.CRC32;
@@ -34,6 +35,7 @@ import java.util.zip.Checksum;
 
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.ColumnVisibility;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.InputFormat;
@@ -56,6 +58,7 @@ public class ContinuousInputFormat extends InputFormat<Key,Value> {
   private static final String PROP_FAM_MAX = "mrbulk.fam.max";
   private static final String PROP_QUAL_MAX = "mrbulk.qual.max";
   private static final String PROP_CHECKSUM = "mrbulk.checksum";
+  private static final String PROP_VIS_FILE = "mrbulk.vis.file";
 
   private static class RandomSplit extends InputSplit implements Writable {
     @Override
@@ -93,6 +96,8 @@ public class ContinuousInputFormat extends InputFormat<Key,Value> {
     conf.setLong(PROP_ROW_MAX, opts.max);
     conf.setInt(PROP_FAM_MAX, opts.maxColF);
     conf.setInt(PROP_QUAL_MAX, opts.maxColQ);
+    if (opts.visFile != null)
+      conf.set(PROP_VIS_FILE, opts.visFile);
     conf.setBoolean(PROP_CHECKSUM, opts.checksum);
   }
 
@@ -110,6 +115,7 @@ public class ContinuousInputFormat extends InputFormat<Key,Value> {
       long maxRow;
       int maxFam;
       int maxQual;
+      List<ColumnVisibility> visibilities;
       boolean checksum;
 
       Key prevKey;
@@ -126,6 +132,12 @@ public class ContinuousInputFormat extends InputFormat<Key,Value> {
         maxFam = job.getConfiguration().getInt(PROP_FAM_MAX, Short.MAX_VALUE);
         maxQual = job.getConfiguration().getInt(PROP_QUAL_MAX, Short.MAX_VALUE);
         checksum = job.getConfiguration().getBoolean(PROP_CHECKSUM, false);
+        String visFile = job.getConfiguration().get(PROP_VIS_FILE);
+        if (visFile == null) {
+          visibilities = Collections.singletonList(new ColumnVisibility());
+        } else {
+          visibilities = ContinuousIngest.readVisFromFile(visFile);
+        }
 
         random = new Random(new SecureRandom().nextLong());
 
@@ -138,15 +150,16 @@ public class ContinuousInputFormat extends InputFormat<Key,Value> {
 
         byte[] fam = genCol(random.nextInt(maxFam));
         byte[] qual = genCol(random.nextInt(maxQual));
+        byte[] cv = visibilities.get(random.nextInt(visibilities.size())).flatten();
 
         if (cksum != null) {
           cksum.update(row);
           cksum.update(fam);
           cksum.update(qual);
-          cksum.update(new byte[0]); // TODO col vis
+          cksum.update(cv);
         }
 
-        return new Key(row, fam, qual);
+        return new Key(row, fam, qual, cv);
       }
 
       private byte[] createValue(byte[] ingestInstanceId, byte[] prevRow, Checksum cksum) {