You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by mw...@apache.org on 2018/07/06 16:16:21 UTC
[accumulo] branch master updated: Added additional client
properties for Scanner and BatchScanner (#548)
This is an automated email from the ASF dual-hosted git repository.
mwalch pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/master by this push:
new f2d0371 Added additional client properties for Scanner and BatchScanner (#548)
f2d0371 is described below
commit f2d037138859681daf9072807c270f2ed96638a4
Author: Mike Walch <mw...@apache.org>
AuthorDate: Fri Jul 6 12:16:18 2018 -0400
Added additional client properties for Scanner and BatchScanner (#548)
---
.../org/apache/accumulo/core/client/Connector.java | 32 +++++++++++++++++++++-
.../accumulo/core/client/impl/ConnectorImpl.java | 30 +++++++++++++++++++-
.../accumulo/core/conf/ClientConfigGenerate.java | 2 ++
.../apache/accumulo/core/conf/ClientProperty.java | 8 ++++++
4 files changed, 70 insertions(+), 2 deletions(-)
diff --git a/core/src/main/java/org/apache/accumulo/core/client/Connector.java b/core/src/main/java/org/apache/accumulo/core/client/Connector.java
index 2a7ee36..c85da31 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/Connector.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/Connector.java
@@ -57,6 +57,27 @@ public abstract class Connector {
int numQueryThreads) throws TableNotFoundException;
/**
+ * Factory method to create a BatchScanner connected to Accumulo. This method uses the number of
+ * query threads configured when Connector was created. If none were configured, defaults will be
+ * used.
+ *
+ * @param tableName
+ * the name of the table to query
+ * @param authorizations
+ * A set of authorization labels that will be checked against the column visibility of
+ * each key in order to filter data. The authorizations passed in must be a subset of the
+ * accumulo user's set of authorizations. If the accumulo user has authorizations (A1,
+ * A2) and authorizations (A2, A3) are passed, then an exception will be thrown.
+ *
+ * @return BatchScanner object for configuring and querying
+ * @throws TableNotFoundException
+ * when the specified table doesn't exist
+ * @since 2.0.0
+ */
+ public abstract BatchScanner createBatchScanner(String tableName, Authorizations authorizations)
+ throws TableNotFoundException;
+
+ /**
* Factory method to create a BatchDeleter connected to Accumulo.
*
* @param tableName
@@ -165,7 +186,6 @@ public abstract class Connector {
* @return BatchWriter object for configuring and writing data to
* @since 1.5.0
*/
-
public abstract BatchWriter createBatchWriter(String tableName, BatchWriterConfig config)
throws TableNotFoundException;
@@ -585,6 +605,16 @@ public abstract class Connector {
* @return this builder
*/
ConnectionOptions withBatchWriterConfig(BatchWriterConfig batchWriterConfig);
+
+ /**
+ * Build with default number of query threads for BatchScanner
+ */
+ ConnectionOptions withBatchScannerQueryThreads(int numQueryThreads);
+
+ /**
+ * Build with default batch size for Scanner
+ */
+ ConnectionOptions withScannerBatchSize(int batchSize);
}
public interface FromOptions extends ConnectionOptions, PropertyOptions, AuthenticationArgs {
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ConnectorImpl.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ConnectorImpl.java
index 244f764..eac7532 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ConnectorImpl.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ConnectorImpl.java
@@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkArgument;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
@@ -108,6 +109,15 @@ public class ConnectorImpl extends Connector {
numQueryThreads);
}
+ @Override
+ public BatchScanner createBatchScanner(String tableName, Authorizations authorizations)
+ throws TableNotFoundException {
+ Integer numQueryThreads = ClientProperty.BATCH_SCANNER_NUM_QUERY_THREADS
+ .getInteger(context.getClientInfo().getProperties());
+ Objects.requireNonNull(numQueryThreads);
+ return createBatchScanner(tableName, authorizations, numQueryThreads);
+ }
+
@Deprecated
@Override
public BatchDeleter createBatchDeleter(String tableName, Authorizations authorizations,
@@ -191,7 +201,13 @@ public class ConnectorImpl extends Connector {
throws TableNotFoundException {
checkArgument(tableName != null, "tableName is null");
checkArgument(authorizations != null, "authorizations is null");
- return new ScannerImpl(context, getTableId(tableName), authorizations);
+ Scanner scanner = new ScannerImpl(context, getTableId(tableName), authorizations);
+ Integer batchSize = ClientProperty.SCANNER_BATCH_SIZE
+ .getInteger(context.getClientInfo().getProperties());
+ if (batchSize != null) {
+ scanner.setBatchSize(batchSize);
+ }
+ return scanner;
}
@Override
@@ -348,6 +364,18 @@ public class ConnectorImpl extends Connector {
}
@Override
+ public ConnectionOptions withBatchScannerQueryThreads(int numQueryThreads) {
+ setProperty(ClientProperty.BATCH_SCANNER_NUM_QUERY_THREADS, numQueryThreads);
+ return this;
+ }
+
+ @Override
+ public ConnectionOptions withScannerBatchSize(int batchSize) {
+ setProperty(ClientProperty.SCANNER_BATCH_SIZE, batchSize);
+ return this;
+ }
+
+ @Override
public SaslOptions withPrimary(String kerberosServerPrimary) {
setProperty(ClientProperty.SASL_KERBEROS_SERVER_PRIMARY, kerberosServerPrimary);
return this;
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/ClientConfigGenerate.java b/core/src/main/java/org/apache/accumulo/core/conf/ClientConfigGenerate.java
index 7b95ba2..691c660 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/ClientConfigGenerate.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/ClientConfigGenerate.java
@@ -47,6 +47,8 @@ class ClientConfigGenerate {
generateSection("Instance", "instance.");
generateSection("Authentication", "auth.", "auth.type", "auth.principal");
generateSection("Batch Writer", "batch.writer.");
+ generateSection("Batch Scanner", "batch.scanner.");
+ generateSection("Scanner", "scanner.");
generateSection("SSL", "ssl.");
generateSection("SASL", "sasl.");
generateSection("Tracing", "trace.");
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/ClientProperty.java b/core/src/main/java/org/apache/accumulo/core/conf/ClientProperty.java
index 1ec1609..0b47e7f 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/ClientProperty.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/ClientProperty.java
@@ -61,6 +61,14 @@ public enum ClientProperty {
"Change the" + " durability for the BatchWriter session. To use the table's durability"
+ " setting. use \"default\" which is the table's durability setting."),
+ // Scanner
+ SCANNER_BATCH_SIZE("scanner.batch.size", "1000",
+ "Number of key/value pairs that will be fetched at time from tablet server"),
+
+ // BatchScanner
+ BATCH_SCANNER_NUM_QUERY_THREADS("batch.scanner.num.query.threads", "3",
+ "Number of concurrent query threads to spawn for querying"),
+
// Bulk load
BULK_LOAD_THREADS("bulk.threads", "8C",
"The number of threads used to inspect bulk load files to determine where files go. "