You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by mm...@apache.org on 2018/10/05 19:35:32 UTC

[accumulo] branch master updated: Add methods to set props in Rfile summaries. Fixes #612 (#679)

This is an automated email from the ASF dual-hosted git repository.

mmiller pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/master by this push:
     new d1da917  Add methods to set props in Rfile summaries. Fixes #612 (#679)
d1da917 is described below

commit d1da9172dda8301c9ecbdecf89dd80a59ce9b1e4
Author: Mike Miller <mm...@apache.org>
AuthorDate: Fri Oct 5 15:35:21 2018 -0400

    Add methods to set props in Rfile summaries. Fixes #612 (#679)
---
 .../apache/accumulo/core/client/rfile/RFile.java   | 22 +++++++++++
 .../core/client/rfile/RFileSummariesRetriever.java | 32 ++++++++++++---
 .../accumulo/core/security/crypto/CryptoTest.java  | 46 +++++++++++++++++++++-
 3 files changed, 93 insertions(+), 7 deletions(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFile.java b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFile.java
index c0f41fb..53039d5 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFile.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFile.java
@@ -259,6 +259,28 @@ public class RFile {
    */
   public interface SummaryOptions {
     /**
+     * Retrieve summaries with provided tables properties. Properties for a table can be obtained by
+     * calling {@link TableOperations#getProperties(String)}. Any property that impacts file
+     * behavior regardless of whether it has the {@link Property#TABLE_PREFIX} may be accepted and
+     * used. For example, cache and crypto properties could be passed here.
+     *
+     * @param props
+     *          iterable over Accumulo table key value properties.
+     * @return this
+     */
+    SummaryOptions withTableProperties(Iterable<Entry<String,String>> props);
+
+    /**
+     * @see #withTableProperties(Iterable) Any property that impacts file behavior regardless of
+     *      whether it has the {@link Property#TABLE_PREFIX} may be accepted and used. For example,
+     *      cache and crypto properties could be passed here.
+     * @param props
+     *          a map instead of an Iterable
+     * @return this
+     */
+    SummaryOptions withTableProperties(Map<String,String> props);
+
+    /**
      * This method allows retrieving a subset of summary data from a file. If a file has lots of
      * separate summaries, reading a subset may be faster.
      *
diff --git a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileSummariesRetriever.java b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileSummariesRetriever.java
index 07d7aaa..442e41e 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileSummariesRetriever.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileSummariesRetriever.java
@@ -20,6 +20,8 @@ package org.apache.accumulo.core.client.rfile;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Objects;
 import java.util.function.Predicate;
 
@@ -29,14 +31,13 @@ import org.apache.accumulo.core.client.rfile.RFile.SummaryOptions;
 import org.apache.accumulo.core.client.rfile.RFileScannerBuilder.InputArgs;
 import org.apache.accumulo.core.client.summary.SummarizerConfiguration;
 import org.apache.accumulo.core.client.summary.Summary;
-import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.ConfigurationCopy;
 import org.apache.accumulo.core.conf.DefaultConfiguration;
 import org.apache.accumulo.core.security.crypto.CryptoServiceFactory;
 import org.apache.accumulo.core.summary.Gatherer;
 import org.apache.accumulo.core.summary.SummarizerFactory;
 import org.apache.accumulo.core.summary.SummaryCollection;
 import org.apache.accumulo.core.summary.SummaryReader;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.io.Text;
 
@@ -46,6 +47,7 @@ class RFileSummariesRetriever implements SummaryInputArguments, SummaryFSOptions
   private Text startRow;
   private InputArgs in;
   private Text endRow;
+  private Map<String,String> config = Collections.emptyMap();
 
   @Override
   public SummaryOptions selectSummaries(Predicate<SummarizerConfiguration> summarySelector) {
@@ -81,15 +83,15 @@ class RFileSummariesRetriever implements SummaryInputArguments, SummaryFSOptions
   @Override
   public Collection<Summary> read() throws IOException {
     SummarizerFactory factory = new SummarizerFactory();
-    AccumuloConfiguration acuconf = DefaultConfiguration.getInstance();
-    Configuration conf = in.getFileSystem().getConf();
+    ConfigurationCopy acuconf = new ConfigurationCopy(DefaultConfiguration.getInstance());
+    config.forEach((k, v) -> acuconf.set(k, v));
 
     RFileSource[] sources = in.getSources();
     try {
       SummaryCollection all = new SummaryCollection();
       for (RFileSource source : sources) {
-        SummaryReader fileSummary = SummaryReader.load(conf, acuconf, source.getInputStream(),
-            source.getLength(), summarySelector, factory,
+        SummaryReader fileSummary = SummaryReader.load(in.getFileSystem().getConf(), acuconf,
+            source.getInputStream(), source.getLength(), summarySelector, factory,
             CryptoServiceFactory.newInstance(acuconf));
         SummaryCollection sc = fileSummary
             .getSummaries(Collections.singletonList(new Gatherer.RowRange(startRow, endRow)));
@@ -123,4 +125,22 @@ class RFileSummariesRetriever implements SummaryInputArguments, SummaryFSOptions
     in = new InputArgs(files);
     return this;
   }
+
+  @Override
+  public SummaryOptions withTableProperties(Iterable<Map.Entry<String,String>> props) {
+    Objects.requireNonNull(props);
+    HashMap<String,String> cfg = new HashMap<>();
+    for (Map.Entry<String,String> entry : props) {
+      cfg.put(entry.getKey(), entry.getValue());
+    }
+    this.config = cfg;
+    return this;
+  }
+
+  @Override
+  public SummaryOptions withTableProperties(Map<String,String> props) {
+    Objects.requireNonNull(props);
+    withTableProperties(props.entrySet());
+    return this;
+  }
 }
diff --git a/core/src/test/java/org/apache/accumulo/core/security/crypto/CryptoTest.java b/core/src/test/java/org/apache/accumulo/core/security/crypto/CryptoTest.java
index 4eb6b2f..00c3d66 100644
--- a/core/src/test/java/org/apache/accumulo/core/security/crypto/CryptoTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/security/crypto/CryptoTest.java
@@ -35,6 +35,7 @@ import java.security.NoSuchProviderException;
 import java.security.SecureRandom;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Map;
 
 import javax.crypto.Cipher;
@@ -44,6 +45,9 @@ import javax.crypto.spec.SecretKeySpec;
 import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.rfile.RFile;
 import org.apache.accumulo.core.client.rfile.RFileWriter;
+import org.apache.accumulo.core.client.summary.Summarizer;
+import org.apache.accumulo.core.client.summary.SummarizerConfiguration;
+import org.apache.accumulo.core.client.summary.Summary;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.ConfigurationCopy;
 import org.apache.accumulo.core.conf.DefaultConfiguration;
@@ -71,6 +75,8 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
 
+import com.google.common.collect.Iterables;
+
 public class CryptoTest {
 
   public static final int MARKER_INT = 0xCADEFEDD;
@@ -192,11 +198,13 @@ public class CryptoTest {
     AccumuloConfiguration cryptoOnConf = getAccumuloConfig(CRYPTO_ON_CONF);
     FileSystem fs = FileSystem.getLocal(CachedConfiguration.getInstance());
     ArrayList<Key> keys = testData();
+    SummarizerConfiguration sumConf = SummarizerConfiguration.builder(KeyCounter.class.getName())
+        .build();
 
     String file = "target/testFile1.rf";
     fs.delete(new Path(file), true);
     try (RFileWriter writer = RFile.newWriter().to(file).withFileSystem(fs)
-        .withTableProperties(cryptoOnConf).build()) {
+        .withTableProperties(cryptoOnConf).withSummarizers(sumConf).build()) {
       Value empty = new Value(new byte[] {});
       writer.startDefaultLocalityGroup();
       for (Key key : keys) {
@@ -209,6 +217,15 @@ public class CryptoTest {
     ArrayList<Key> keysRead = new ArrayList<>();
     iter.forEach(e -> keysRead.add(e.getKey()));
     assertEquals(keys, keysRead);
+
+    Collection<Summary> summaries = RFile.summaries().from(file).withFileSystem(fs)
+        .withTableProperties(cryptoOnConf).read();
+    Summary summary = Iterables.getOnlyElement(summaries);
+    assertEquals(keys.size(), (long) summary.getStatistics().get("keys"));
+    assertEquals(1, summary.getStatistics().size());
+    assertEquals(0, summary.getFileStatistics().getInaccurate());
+    assertEquals(1, summary.getFileStatistics().getTotal());
+
   }
 
   @Test
@@ -365,4 +382,31 @@ public class CryptoTest {
     return Arrays.toString(stringMarkerBytes);
   }
 
+  // simple counter to just make sure crypto works with summaries
+  public static class KeyCounter implements Summarizer {
+    @Override
+    public Collector collector(SummarizerConfiguration sc) {
+      return new Collector() {
+
+        long keys = 0;
+
+        @Override
+        public void accept(Key k, Value v) {
+          if (!k.isDeleted())
+            keys++;
+        }
+
+        @Override
+        public void summarize(StatisticConsumer sc) {
+          sc.accept("keys", keys);
+        }
+      };
+    }
+
+    @Override
+    public Combiner combiner(SummarizerConfiguration sc) {
+      return (m1, m2) -> m2.forEach((k, v) -> m1.merge(k, v, Long::sum));
+    }
+  }
+
 }