You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by mw...@apache.org on 2018/07/06 16:16:21 UTC

[accumulo] branch master updated: Added additional client properties for Scanner and BatchScanner (#548)

This is an automated email from the ASF dual-hosted git repository.

mwalch pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/master by this push:
     new f2d0371  Added additional client properties for Scanner and BatchScanner (#548)
f2d0371 is described below

commit f2d037138859681daf9072807c270f2ed96638a4
Author: Mike Walch <mw...@apache.org>
AuthorDate: Fri Jul 6 12:16:18 2018 -0400

    Added additional client properties for Scanner and BatchScanner (#548)
---
 .../org/apache/accumulo/core/client/Connector.java | 32 +++++++++++++++++++++-
 .../accumulo/core/client/impl/ConnectorImpl.java   | 30 +++++++++++++++++++-
 .../accumulo/core/conf/ClientConfigGenerate.java   |  2 ++
 .../apache/accumulo/core/conf/ClientProperty.java  |  8 ++++++
 4 files changed, 70 insertions(+), 2 deletions(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/client/Connector.java b/core/src/main/java/org/apache/accumulo/core/client/Connector.java
index 2a7ee36..c85da31 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/Connector.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/Connector.java
@@ -57,6 +57,27 @@ public abstract class Connector {
       int numQueryThreads) throws TableNotFoundException;
 
   /**
+   * Factory method to create a BatchScanner connected to Accumulo. This method uses the number of
+   * query threads configured when Connector was created. If none were configured, defaults will be
+   * used.
+   *
+   * @param tableName
+   *          the name of the table to query
+   * @param authorizations
+   *          A set of authorization labels that will be checked against the column visibility of
+   *          each key in order to filter data. The authorizations passed in must be a subset of the
+   *          accumulo user's set of authorizations. If the accumulo user has authorizations (A1,
+   *          A2) and authorizations (A2, A3) are passed, then an exception will be thrown.
+   *
+   * @return BatchScanner object for configuring and querying
+   * @throws TableNotFoundException
+   *           when the specified table doesn't exist
+   * @since 2.0.0
+   */
+  public abstract BatchScanner createBatchScanner(String tableName, Authorizations authorizations)
+      throws TableNotFoundException;
+
+  /**
    * Factory method to create a BatchDeleter connected to Accumulo.
    *
    * @param tableName
@@ -165,7 +186,6 @@ public abstract class Connector {
    * @return BatchWriter object for configuring and writing data to
    * @since 1.5.0
    */
-
   public abstract BatchWriter createBatchWriter(String tableName, BatchWriterConfig config)
       throws TableNotFoundException;
 
@@ -585,6 +605,16 @@ public abstract class Connector {
      * @return this builder
      */
     ConnectionOptions withBatchWriterConfig(BatchWriterConfig batchWriterConfig);
+
+    /**
+     * Build with default number of query threads for BatchScanner
+     */
+    ConnectionOptions withBatchScannerQueryThreads(int numQueryThreads);
+
+    /**
+     * Build with default batch size for Scanner
+     */
+    ConnectionOptions withScannerBatchSize(int batchSize);
   }
 
   public interface FromOptions extends ConnectionOptions, PropertyOptions, AuthenticationArgs {
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ConnectorImpl.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ConnectorImpl.java
index 244f764..eac7532 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ConnectorImpl.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ConnectorImpl.java
@@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkArgument;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.Objects;
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 
@@ -108,6 +109,15 @@ public class ConnectorImpl extends Connector {
         numQueryThreads);
   }
 
+  @Override
+  public BatchScanner createBatchScanner(String tableName, Authorizations authorizations)
+      throws TableNotFoundException {
+    Integer numQueryThreads = ClientProperty.BATCH_SCANNER_NUM_QUERY_THREADS
+        .getInteger(context.getClientInfo().getProperties());
+    Objects.requireNonNull(numQueryThreads);
+    return createBatchScanner(tableName, authorizations, numQueryThreads);
+  }
+
   @Deprecated
   @Override
   public BatchDeleter createBatchDeleter(String tableName, Authorizations authorizations,
@@ -191,7 +201,13 @@ public class ConnectorImpl extends Connector {
       throws TableNotFoundException {
     checkArgument(tableName != null, "tableName is null");
     checkArgument(authorizations != null, "authorizations is null");
-    return new ScannerImpl(context, getTableId(tableName), authorizations);
+    Scanner scanner = new ScannerImpl(context, getTableId(tableName), authorizations);
+    Integer batchSize = ClientProperty.SCANNER_BATCH_SIZE
+        .getInteger(context.getClientInfo().getProperties());
+    if (batchSize != null) {
+      scanner.setBatchSize(batchSize);
+    }
+    return scanner;
   }
 
   @Override
@@ -348,6 +364,18 @@ public class ConnectorImpl extends Connector {
     }
 
     @Override
+    public ConnectionOptions withBatchScannerQueryThreads(int numQueryThreads) {
+      setProperty(ClientProperty.BATCH_SCANNER_NUM_QUERY_THREADS, numQueryThreads);
+      return this;
+    }
+
+    @Override
+    public ConnectionOptions withScannerBatchSize(int batchSize) {
+      setProperty(ClientProperty.SCANNER_BATCH_SIZE, batchSize);
+      return this;
+    }
+
+    @Override
     public SaslOptions withPrimary(String kerberosServerPrimary) {
       setProperty(ClientProperty.SASL_KERBEROS_SERVER_PRIMARY, kerberosServerPrimary);
       return this;
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/ClientConfigGenerate.java b/core/src/main/java/org/apache/accumulo/core/conf/ClientConfigGenerate.java
index 7b95ba2..691c660 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/ClientConfigGenerate.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/ClientConfigGenerate.java
@@ -47,6 +47,8 @@ class ClientConfigGenerate {
       generateSection("Instance", "instance.");
       generateSection("Authentication", "auth.", "auth.type", "auth.principal");
       generateSection("Batch Writer", "batch.writer.");
+      generateSection("Batch Scanner", "batch.scanner.");
+      generateSection("Scanner", "scanner.");
       generateSection("SSL", "ssl.");
       generateSection("SASL", "sasl.");
       generateSection("Tracing", "trace.");
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/ClientProperty.java b/core/src/main/java/org/apache/accumulo/core/conf/ClientProperty.java
index 1ec1609..0b47e7f 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/ClientProperty.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/ClientProperty.java
@@ -61,6 +61,14 @@ public enum ClientProperty {
       "Change the" + " durability for the BatchWriter session. To use the table's durability"
           + " setting. use \"default\" which is the table's durability setting."),
 
+  // Scanner
+  SCANNER_BATCH_SIZE("scanner.batch.size", "1000",
+      "Number of key/value pairs that will be fetched at time from tablet server"),
+
+  // BatchScanner
+  BATCH_SCANNER_NUM_QUERY_THREADS("batch.scanner.num.query.threads", "3",
+      "Number of concurrent query threads to spawn for querying"),
+
   // Bulk load
   BULK_LOAD_THREADS("bulk.threads", "8C",
       "The number of threads used to inspect bulk load files to determine where files go.  "