You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@fluo.apache.org by kt...@apache.org on 2023/01/15 04:08:15 UTC
[fluo] branch main updated: Adds scan time authorizations to transactions/snapshots (#1120)
This is an automated email from the ASF dual-hosted git repository.
kturner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluo.git
The following commit(s) were added to refs/heads/main by this push:
new 4bdb9389 Adds scan time authorizations to transactions/snapshots (#1120)
4bdb9389 is described below
commit 4bdb938939801c531589281751677c25cbfd0565
Author: Bill S <wj...@users.noreply.github.com>
AuthorDate: Sat Jan 14 23:08:09 2023 -0500
Adds scan time authorizations to transactions/snapshots (#1120)
1. Allows a user to specify a collection of authorizations/visibility labels
to use when scanning data. Writes are allowed so long as the current
connection's authorizations satisfy any visibility expressions.
2. Connection-wide authorizations are set in the fluo configuration and must
be set on the underlying Accumulo user.
Co-authored-by: Bill Slacum <bi...@glidedog.com>
---
.../java/org/apache/fluo/api/client/Snapshot.java | 2 +
.../org/apache/fluo/api/client/SnapshotBase.java | 20 +++
.../apache/fluo/api/config/FluoConfiguration.java | 23 +++
.../fluo/api/config/SimpleConfiguration.java | 28 ++++
.../java/org/apache/fluo/command/ScanTest.java | 4 +-
.../apache/fluo/core/client/FluoClientImpl.java | 3 +
.../org/apache/fluo/core/impl/Environment.java | 11 ++
.../fluo/core/impl/ParallelSnapshotScanner.java | 29 +++-
.../org/apache/fluo/core/impl/SnapshotScanner.java | 14 +-
.../org/apache/fluo/core/impl/TransactionImpl.java | 56 +++++--
.../fluo/core/impl/scanner/ScannerBuilderImpl.java | 22 ++-
.../client/FluoClientAuthorizationsIT.java | 170 +++++++++++++++++++++
.../fluo/integration/client/FluoClientIT.java | 29 ++++
13 files changed, 387 insertions(+), 24 deletions(-)
diff --git a/modules/api/src/main/java/org/apache/fluo/api/client/Snapshot.java b/modules/api/src/main/java/org/apache/fluo/api/client/Snapshot.java
index 625cf626..824a14e2 100644
--- a/modules/api/src/main/java/org/apache/fluo/api/client/Snapshot.java
+++ b/modules/api/src/main/java/org/apache/fluo/api/client/Snapshot.java
@@ -15,6 +15,8 @@
package org.apache.fluo.api.client;
+import java.util.Collection;
+
/**
* Allows users to read from a Fluo table at a certain point in time. Snapshot extends
* {@link SnapshotBase} to include a {@link #close} method which must be called when you are
diff --git a/modules/api/src/main/java/org/apache/fluo/api/client/SnapshotBase.java b/modules/api/src/main/java/org/apache/fluo/api/client/SnapshotBase.java
index 7c4a2269..ff0a694c 100644
--- a/modules/api/src/main/java/org/apache/fluo/api/client/SnapshotBase.java
+++ b/modules/api/src/main/java/org/apache/fluo/api/client/SnapshotBase.java
@@ -203,4 +203,24 @@ public interface SnapshotBase {
default CompletableFuture<Bytes> getAsync(Bytes row, Column column, Bytes defaultValue) {
return CompletableFuture.completedFuture(get(row, column, defaultValue));
}
+
+ /**
+ * All reads done using this snapshot after this call will use the passed in authorizations to
+ * filter data.
+ *
+ * @since 2.0.0
+ */
+ default void setScanTimeAuthorizations(Collection<String> authorizations) {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Returns the set of scan time authorization that are currently in use for filtering data. The
+ * empty set indicates no filtering is being done using scan time authorizations.
+ *
+ * @since 2.0.0
+ */
+ default Collection<String> getScanTimeAuthorizations() {
+ throw new UnsupportedOperationException();
+ }
}
diff --git a/modules/api/src/main/java/org/apache/fluo/api/config/FluoConfiguration.java b/modules/api/src/main/java/org/apache/fluo/api/config/FluoConfiguration.java
index 3cf944b5..b18b21c3 100644
--- a/modules/api/src/main/java/org/apache/fluo/api/config/FluoConfiguration.java
+++ b/modules/api/src/main/java/org/apache/fluo/api/config/FluoConfiguration.java
@@ -151,6 +151,9 @@ public class FluoConfiguration extends SimpleConfiguration {
* @since 1.2.0
*/
public static final String ACCUMULO_USER_PROP = ACCUMULO_PREFIX + ".user";
+
+ public static final String ACCUMULO_AUTH_PROP = ACCUMULO_PREFIX + ".auths";
+
/**
* @since 1.2.0
*/
@@ -486,6 +489,7 @@ public class FluoConfiguration extends SimpleConfiguration {
return getDepNonEmptyString(ACCUMULO_USER_PROP, CLIENT_ACCUMULO_USER_PROP);
}
+
/**
* Sets the Apache Accumulo password property {@value #ACCUMULO_PASSWORD_PROP}
*
@@ -510,6 +514,25 @@ public class FluoConfiguration extends SimpleConfiguration {
throw new NoSuchElementException(ACCUMULO_PASSWORD_PROP + " is not set!");
}
+ /**
+ * @since 2.0.0
+ */
+ public FluoConfiguration setAccumuloAuthorizations(String... auths) {
+ setProperties(ACCUMULO_AUTH_PROP, auths);
+ return this;
+ }
+
+ /**
+ * @since 2.0.0
+ */
+ public String[] getAccumuloAuthorizations() {
+ if (containsKey(ACCUMULO_AUTH_PROP)) {
+ return this.getProperties(ACCUMULO_AUTH_PROP);
+ } else {
+ return new String[0];
+ }
+ }
+
/**
* Sets the value of the property {@value #ACCUMULO_ZOOKEEPERS_PROP}
*
diff --git a/modules/api/src/main/java/org/apache/fluo/api/config/SimpleConfiguration.java b/modules/api/src/main/java/org/apache/fluo/api/config/SimpleConfiguration.java
index b48dd835..88aebb0b 100644
--- a/modules/api/src/main/java/org/apache/fluo/api/config/SimpleConfiguration.java
+++ b/modules/api/src/main/java/org/apache/fluo/api/config/SimpleConfiguration.java
@@ -30,6 +30,7 @@ import java.io.StringReader;
import java.io.UncheckedIOException;
import java.io.Writer;
import java.nio.file.Files;
+import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
@@ -242,6 +243,33 @@ public class SimpleConfiguration implements Serializable {
internalConfig.setProperty(key, value);
}
+ /**
+ * @since 2.0.0
+ */
+ public void setProperties(String key, String... values) {
+ Objects.requireNonNull(values, "Values for key `" + key + "` must be non-null.");
+ // don't let callers modify the array underneath of us
+ String[] copy = new String[values.length];
+ System.arraycopy(values, 0, copy, 0, copy.length);
+ for (String value : copy) {
+ Objects.requireNonNull(value, "Encountered null value for key `" + key + "`.");
+ }
+ internalConfig.setProperty(key, values);
+ }
+
+ /**
+ * @since 2.0.0
+ */
+ public String[] getProperties(String key) {
+ // TODO fix cast class; use Properties?
+ ArrayList<String> values = (ArrayList<String>) internalConfig.getProperty(key);
+ if (values == null) {
+ return new String[0];
+ } else {
+ return values.toArray(new String[values.size()]);
+ }
+ }
+
/**
* Returns a subset of config that start with given prefix. The prefix will not be present in keys
* of the returned config. Any changes made to the returned config will be made to this and visa
diff --git a/modules/command/src/test/java/org/apache/fluo/command/ScanTest.java b/modules/command/src/test/java/org/apache/fluo/command/ScanTest.java
index fabc3aaf..f7f1dd3f 100644
--- a/modules/command/src/test/java/org/apache/fluo/command/ScanTest.java
+++ b/modules/command/src/test/java/org/apache/fluo/command/ScanTest.java
@@ -16,6 +16,7 @@
package org.apache.fluo.command;
import com.beust.jcommander.JCommander;
+import org.apache.accumulo.core.security.Authorizations;
import org.apache.fluo.api.data.Column;
import org.apache.fluo.api.data.RowColumn;
import org.apache.fluo.api.data.Span;
@@ -34,7 +35,8 @@ public class ScanTest {
JCommander jcommand = new JCommander(scan);
jcommand.parse(args.split(" "));
ScanUtil.ScanOpts opts = scan.getScanOpts();
- return new SnapshotScanner.Opts(ScanUtil.getSpan(opts), ScanUtil.getColumns(opts), false);
+ return new SnapshotScanner.Opts(ScanUtil.getSpan(opts), ScanUtil.getColumns(opts), false,
+ Authorizations.EMPTY);
}
@Test
diff --git a/modules/core/src/main/java/org/apache/fluo/core/client/FluoClientImpl.java b/modules/core/src/main/java/org/apache/fluo/core/client/FluoClientImpl.java
index 22749900..797f7ffb 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/client/FluoClientImpl.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/client/FluoClientImpl.java
@@ -15,11 +15,14 @@
package org.apache.fluo.core.client;
+import java.util.Collection;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
+import org.apache.accumulo.core.security.Authorizations;
import org.apache.fluo.api.client.FluoClient;
import org.apache.fluo.api.client.LoaderExecutor;
import org.apache.fluo.api.client.Snapshot;
diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/Environment.java b/modules/core/src/main/java/org/apache/fluo/core/impl/Environment.java
index f9a23162..9c35743e 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/impl/Environment.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/Environment.java
@@ -18,10 +18,14 @@ package org.apache.fluo.core.impl;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.charset.StandardCharsets;
+import java.util.List;
import java.util.Map.Entry;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.TableNotFoundException;
@@ -102,6 +106,13 @@ public class Environment implements AutoCloseable {
+ client.instanceOperations().getInstanceId() + " != " + accumuloInstanceID);
}
+ String[] auths = config.getAccumuloAuthorizations();
+ if (auths.length == 0) {
+ this.auths = new Authorizations();
+ } else {
+ this.auths = new Authorizations(auths);
+ }
+
try {
resources = new SharedResources(this);
} catch (TableNotFoundException e1) {
diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/ParallelSnapshotScanner.java b/modules/core/src/main/java/org/apache/fluo/core/impl/ParallelSnapshotScanner.java
index 4d2b37ce..17f4620f 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/impl/ParallelSnapshotScanner.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/ParallelSnapshotScanner.java
@@ -33,6 +33,7 @@ import org.apache.accumulo.core.data.ByteSequence;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
import org.apache.fluo.accumulo.util.ColumnType;
import org.apache.fluo.api.data.Bytes;
import org.apache.fluo.api.data.Column;
@@ -56,9 +57,11 @@ public class ParallelSnapshotScanner {
private Map<Bytes, Set<Column>> readLocksSeen;
private Consumer<Entry<Key, Value>> writeLocksSeen;
+ private Authorizations authorizations;
+
ParallelSnapshotScanner(Collection<Bytes> rows, Set<Column> columns, Environment env,
long startTs, TxStats stats, Map<Bytes, Set<Column>> readLocksSeen,
- Consumer<Entry<Key, Value>> writeLocksSeen) {
+ Consumer<Entry<Key, Value>> writeLocksSeen, Authorizations authorizations) {
this.rows = rows;
this.columns = columns;
this.env = env;
@@ -68,19 +71,33 @@ public class ParallelSnapshotScanner {
this.columnConverter = new CachedColumnConverter(columns);
this.readLocksSeen = readLocksSeen;
this.writeLocksSeen = writeLocksSeen;
+ this.authorizations = authorizations;
+ }
+
+ ParallelSnapshotScanner(Collection<Bytes> rows, Set<Column> columns, Environment env,
+ long startTs, TxStats stats, Map<Bytes, Set<Column>> readLocksSeen,
+ Consumer<Entry<Key, Value>> writeLocksSeen) {
+ this(rows, columns, env, startTs, stats, readLocksSeen, writeLocksSeen,
+ env.getAuthorizations());
}
ParallelSnapshotScanner(Collection<RowColumn> cells, Environment env, long startTs, TxStats stats,
Map<Bytes, Set<Column>> readLocksSeen, Consumer<Entry<Key, Value>> writeLocksSeen) {
+ this(cells, env, startTs, stats, readLocksSeen, writeLocksSeen, env.getAuthorizations());
+ }
+
+ ParallelSnapshotScanner(Collection<RowColumn> cells, Environment env, long startTs, TxStats stats,
+ Map<Bytes, Set<Column>> readLocksSeen, Consumer<Entry<Key, Value>> writeLocksSeen,
+ Authorizations authorizations) {
for (RowColumn rc : cells) {
byte[] r = rc.getRow().toArray();
byte[] cf = rc.getColumn().getFamily().toArray();
byte[] cq = rc.getColumn().getQualifier().toArray();
- byte[] cv = rc.getColumn().getVisibility().toArray();
+ byte[] cv = new byte[0];
+ byte[] cv2 = new byte[] {(byte) 0xff};
Key start = new Key(r, cf, cq, cv, Long.MAX_VALUE, false, false);
- Key end = new Key(start);
- end.setTimestamp(Long.MIN_VALUE);
+ Key end = new Key(r, cf, cq, cv2, Long.MIN_VALUE, false, false);
rangesToScan.add(new Range(start, true, end, true));
}
@@ -92,6 +109,7 @@ public class ParallelSnapshotScanner {
this.columnConverter = ColumnUtil::convert;
this.readLocksSeen = readLocksSeen;
this.writeLocksSeen = writeLocksSeen;
+ this.authorizations = authorizations;
}
private BatchScanner setupBatchScanner() {
@@ -100,8 +118,7 @@ public class ParallelSnapshotScanner {
try {
// TODO hardcoded number of threads!
// one thread is probably good.. going for throughput
- scanner =
- env.getAccumuloClient().createBatchScanner(env.getTable(), env.getAuthorizations(), 1);
+ scanner = env.getAccumuloClient().createBatchScanner(env.getTable(), this.authorizations, 1);
} catch (TableNotFoundException e) {
throw new RuntimeException(e);
}
diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/SnapshotScanner.java b/modules/core/src/main/java/org/apache/fluo/core/impl/SnapshotScanner.java
index 3defc897..71c46d27 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/impl/SnapshotScanner.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/SnapshotScanner.java
@@ -29,6 +29,7 @@ import org.apache.accumulo.core.client.ScannerBase;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
import org.apache.fluo.accumulo.iterators.SnapshotIterator;
import org.apache.fluo.accumulo.util.ColumnType;
import org.apache.fluo.api.data.Column;
@@ -51,10 +52,14 @@ public class SnapshotScanner implements Iterable<Entry<Key, Value>> {
private final Collection<Column> columns;
private final boolean showReadLocks;
- public Opts(Span span, Collection<Column> columns, boolean showReadLocks) {
+ private final Authorizations scanTimeAuthz;
+
+ public Opts(Span span, Collection<Column> columns, boolean showReadLocks,
+ Authorizations scanTimeAuthz) {
this.span = span;
this.columns = ImmutableSet.copyOf(columns);
this.showReadLocks = showReadLocks;
+ this.scanTimeAuthz = scanTimeAuthz;
}
public Span getSpan() {
@@ -111,7 +116,9 @@ public class SnapshotScanner implements Iterable<Entry<Key, Value>> {
private void setUpIterator() {
Scanner scanner;
try {
- scanner = env.getAccumuloClient().createScanner(env.getTable(), env.getAuthorizations());
+ scanner = env.getAccumuloClient().createScanner(env.getTable(),
+ snapIterConfig.scanTimeAuthz == null ? env.getAuthorizations()
+ : snapIterConfig.scanTimeAuthz);
} catch (TableNotFoundException e) {
throw new RuntimeException(e);
}
@@ -145,7 +152,8 @@ public class SnapshotScanner implements Iterable<Entry<Key, Value>> {
}
private void resetScanner(Span span) {
- snapIterConfig = new Opts(span, snapIterConfig.columns, snapIterConfig.showReadLocks);
+ snapIterConfig = new Opts(span, snapIterConfig.columns, snapIterConfig.showReadLocks,
+ snapIterConfig.scanTimeAuthz);
setUpIterator();
}
diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java b/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java
index 8c32aec3..6b6f37fe 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java
@@ -15,6 +15,7 @@
package org.apache.fluo.core.impl;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -31,6 +32,7 @@ import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
import java.util.function.Supplier;
+import java.util.stream.Collectors;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
@@ -50,6 +52,7 @@ import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
import org.apache.fluo.accumulo.iterators.PrewriteIterator;
import org.apache.fluo.accumulo.util.ColumnConstants;
import org.apache.fluo.accumulo.util.ColumnType;
@@ -140,11 +143,14 @@ public class TransactionImpl extends AbstractTransactionBase implements AsyncTra
private boolean commitAttempted = false;
private AsyncReader asyncReader = null;
+ private Authorizations scanTimeAuthz;
- public TransactionImpl(Environment env, Notification trigger, long startTs) {
+ public TransactionImpl(Environment env, Notification trigger, long startTs,
+ Authorizations scanTimeAuthz) {
Objects.requireNonNull(env, "environment cannot be null");
Preconditions.checkArgument(startTs >= 0, "startTs cannot be negative");
this.env = env;
+ this.scanTimeAuthz = Objects.requireNonNull(scanTimeAuthz);
this.stats = new TxStats(env);
this.startTs = startTs;
this.observedColumns = env.getConfiguredObservers().getObservedColumns(STRONG);
@@ -164,15 +170,19 @@ public class TransactionImpl extends AbstractTransactionBase implements AsyncTra
}
public TransactionImpl(Environment env, Notification trigger) {
- this(env, trigger, allocateTimestamp(env).getTxTimestamp());
+ this(env, trigger, allocateTimestamp(env).getTxTimestamp(), env.getAuthorizations());
}
public TransactionImpl(Environment env) {
- this(env, null, allocateTimestamp(env).getTxTimestamp());
+ this(env, null, allocateTimestamp(env).getTxTimestamp(), env.getAuthorizations());
}
public TransactionImpl(Environment env, long startTs) {
- this(env, null, startTs);
+ this(env, null, startTs, env.getAuthorizations());
+ }
+
+ public TransactionImpl(Environment env, Authorizations scanTimeAuthz) {
+ this(env, null, allocateTimestamp(env).getTxTimestamp(), scanTimeAuthz);
}
private static Stamp allocateTimestamp(Environment env) {
@@ -204,7 +214,7 @@ public class TransactionImpl extends AbstractTransactionBase implements AsyncTra
ParallelSnapshotScanner pss =
new ParallelSnapshotScanner(rows, columns, env, startTs, stats, readLocksSeen, kve -> {
- });
+ }, this.scanTimeAuthz);
Map<Bytes, Map<Column, Bytes>> ret = pss.scan();
@@ -246,9 +256,9 @@ public class TransactionImpl extends AbstractTransactionBase implements AsyncTra
cols.add(column);
}
}
- opts = new SnapshotScanner.Opts(Span.exact(row), columns, true);
+ opts = new SnapshotScanner.Opts(Span.exact(row), columns, true, this.scanTimeAuthz);
} else {
- opts = new SnapshotScanner.Opts(Span.exact(row), columns, true);
+ opts = new SnapshotScanner.Opts(Span.exact(row), columns, true, this.scanTimeAuthz);
}
Map<Column, Bytes> ret = new HashMap<>();
@@ -283,8 +293,8 @@ public class TransactionImpl extends AbstractTransactionBase implements AsyncTra
return Collections.emptyMap();
}
- ParallelSnapshotScanner pss =
- new ParallelSnapshotScanner(rowColumns, env, startTs, stats, readLocksSeen, writeLocksSeen);
+ ParallelSnapshotScanner pss = new ParallelSnapshotScanner(rowColumns, env, startTs, stats,
+ readLocksSeen, writeLocksSeen, this.scanTimeAuthz);
Map<Bytes, Map<Column, Bytes>> scan = pss.scan();
Map<RowColumn, Bytes> ret = new HashMap<>();
@@ -329,7 +339,7 @@ public class TransactionImpl extends AbstractTransactionBase implements AsyncTra
@Override
public ScannerBuilder scanner() {
checkIfOpen();
- return new ScannerBuilderImpl(this);
+ return new ScannerBuilderImpl(this, this.scanTimeAuthz);
}
private void updateColumnsRead(Bytes row, Set<Column> columns) {
@@ -1386,6 +1396,7 @@ public class TransactionImpl extends AbstractTransactionBase implements AsyncTra
@Override
public Collection<Mutation> createMutations(CommitData cd) {
+
long commitTs = getStats().getCommitTs();
ArrayList<Mutation> mutations = new ArrayList<>(updates.size() + 1);
for (Entry<Bytes, Map<Column, Bytes>> rowUpdates : updates.entrySet()) {
@@ -1569,7 +1580,28 @@ public class TransactionImpl extends AbstractTransactionBase implements AsyncTra
});
}
- public SnapshotScanner newSnapshotScanner(Span span, Collection<Column> columns) {
- return new SnapshotScanner(env, new SnapshotScanner.Opts(span, columns, false), startTs, stats);
+ public SnapshotScanner newSnapshotScanner(Span span, Collection<Column> columns,
+ Authorizations scanTimeAuthz) {
+ return new SnapshotScanner(env, new SnapshotScanner.Opts(span, columns, false, scanTimeAuthz),
+ startTs, stats);
+ }
+
+ @Override
+ public void setScanTimeAuthorizations(Collection<String> labels) {
+ Objects.requireNonNull(labels, "Authorization tokens must not be null!");
+ String[] requestedAuthz = Iterables.toArray(labels, String.class);
+ if (requestedAuthz != null) {
+ if (requestedAuthz.length == 0) {
+ this.scanTimeAuthz = Authorizations.EMPTY;
+ } else {
+ this.scanTimeAuthz = new Authorizations(requestedAuthz);
+ }
+ }
+ }
+
+ @Override
+ public Collection<String> getScanTimeAuthorizations() {
+ return this.scanTimeAuthz.getAuthorizations().stream()
+ .map(auth -> new String(auth, StandardCharsets.UTF_8)).collect(Collectors.toSet());
}
}
diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/scanner/ScannerBuilderImpl.java b/modules/core/src/main/java/org/apache/fluo/core/impl/scanner/ScannerBuilderImpl.java
index 420fb738..b0f4e163 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/impl/scanner/ScannerBuilderImpl.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/scanner/ScannerBuilderImpl.java
@@ -21,6 +21,7 @@ import java.util.Objects;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
+import org.apache.accumulo.core.security.Authorizations;
import org.apache.fluo.api.client.scanner.CellScanner;
import org.apache.fluo.api.client.scanner.RowScannerBuilder;
import org.apache.fluo.api.client.scanner.ScannerBuilder;
@@ -37,10 +38,17 @@ public class ScannerBuilderImpl implements ScannerBuilder {
private Span span = EMPTY_SPAN;
private Collection<Column> columns = Collections.emptyList();
+ private Authorizations scanTimeAuthz = Authorizations.EMPTY;
+
public ScannerBuilderImpl(TransactionImpl tx) {
this.tx = tx;
}
+ public ScannerBuilderImpl(TransactionImpl tx, Authorizations scanTimeAuthz) {
+ this.tx = tx;
+ this.scanTimeAuthz = scanTimeAuthz;
+ }
+
@Override
public ScannerBuilder over(Span span) {
Objects.requireNonNull(span);
@@ -70,16 +78,26 @@ public class ScannerBuilderImpl implements ScannerBuilder {
return this;
}
+ public ScannerBuilder withLabels(Collection<String> authLables) {
+ Objects.requireNonNull(authLables);
+ if (authLables.isEmpty()) {
+ this.scanTimeAuthz = Authorizations.EMPTY;
+ } else {
+ this.scanTimeAuthz = new Authorizations(authLables.toArray(new String[authLables.size()]));
+ }
+ return this;
+ }
+
@Override
public CellScanner build() {
- SnapshotScanner snapScanner = tx.newSnapshotScanner(span, columns);
+ SnapshotScanner snapScanner = tx.newSnapshotScanner(span, columns, scanTimeAuthz);
return new CellScannerImpl(snapScanner, columns);
}
@Override
public RowScannerBuilder byRow() {
return () -> {
- SnapshotScanner snapScanner = tx.newSnapshotScanner(span, columns);
+ SnapshotScanner snapScanner = tx.newSnapshotScanner(span, columns, scanTimeAuthz);
return new RowScannerImpl(snapScanner, columns);
};
}
diff --git a/modules/integration-tests/src/main/java/org/apache/fluo/integration/client/FluoClientAuthorizationsIT.java b/modules/integration-tests/src/main/java/org/apache/fluo/integration/client/FluoClientAuthorizationsIT.java
new file mode 100644
index 00000000..ce37dbbf
--- /dev/null
+++ b/modules/integration-tests/src/main/java/org/apache/fluo/integration/client/FluoClientAuthorizationsIT.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.integration.client;
+
+import java.util.Collections;
+import java.util.Map;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.client.FluoFactory;
+import org.apache.fluo.api.client.Snapshot;
+import org.apache.fluo.api.client.Transaction;
+import org.apache.fluo.api.config.FluoConfiguration;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.exceptions.CommitException;
+import org.apache.fluo.core.util.AccumuloUtil;
+import org.apache.fluo.integration.ITBaseImpl;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class FluoClientAuthorizationsIT extends ITBaseImpl {
+ @Rule
+ public Timeout globalTimeout = Timeout.seconds(getTestTimeout());
+
+ Column ssn = new Column("", "ssn", "PRIVATE");
+ Column name = new Column("", "name", "PUBLIC");
+ Column id = new Column("", "id");
+ FluoConfiguration conf;
+ FluoClient client;
+
+ @Before
+ public void setupAuthorizations() throws Throwable {
+ try (AccumuloClient accumulo = AccumuloUtil.getClient(config)) {
+ accumulo.securityOperations().changeUserAuthorizations(config.getAccumuloUser(),
+ new Authorizations("PRIVATE", "PUBLIC"));
+ }
+ this.conf = new FluoConfiguration(config);
+ this.conf.setAccumuloAuthorizations("PRIVATE", "PUBLIC");
+ this.client = FluoFactory.newClient(this.conf);
+
+ writeSampleData();
+ }
+
+ /*
+ * Kind of sloppy because we assert some basic write functionality already works. However, this
+ * lets us re-use and organize tests that read this data better.
+ */
+ public void writeSampleData() {
+ try (Transaction txn = client.newTransaction()) {
+ txn.set("bill", ssn, "000-00-0001");
+ txn.set("bill", name, "william");
+ txn.set("bill", id, "1");
+ txn.set("bob", ssn, "000-00-0002");
+ txn.set("bob", name, "robert");
+ txn.set("bob", id, "2");
+ txn.commit();
+ }
+ }
+
+ @After
+ public void cleanupClient() throws Throwable {
+ this.client.close();
+ }
+
+ @Test
+ public void testBasicRead() {
+ try (Snapshot snapshot = client.newSnapshot()) {
+ assertEquals(ImmutableSet.of("PUBLIC", "PRIVATE"), snapshot.getScanTimeAuthorizations());
+ Map<Column, String> bill = snapshot.gets("bill");
+ assertTrue(bill.containsKey(name));
+ assertTrue(bill.containsKey(ssn));
+ assertTrue(bill.containsKey(id));
+ Map<Column, String> bob = snapshot.gets("bill");
+ assertTrue(bob.containsKey(name));
+ assertTrue(bob.containsKey(ssn));
+ assertTrue(bob.containsKey(id));
+ }
+ }
+
+ @Test
+ public void testPublicRead() {
+ try (Snapshot snapshot = client.newSnapshot()) {
+ snapshot.setScanTimeAuthorizations(ImmutableList.of("PUBLIC"));
+ assertEquals(ImmutableSet.of("PUBLIC"), snapshot.getScanTimeAuthorizations());
+ Map<Column, String> bill = snapshot.gets("bill");
+ assertTrue(bill.containsKey(name));
+ assertTrue(bill.containsKey(id));
+ assertEquals(2, bill.size());
+ }
+ }
+
+ @Test
+ public void testPrivateRead() {
+ try (Snapshot snapshot = client.newSnapshot()) {
+ snapshot.setScanTimeAuthorizations(ImmutableList.of("PRIVATE"));
+ assertEquals(ImmutableSet.of("PRIVATE"), snapshot.getScanTimeAuthorizations());
+ Map<Column, String> bill = snapshot.gets("bill");
+ assertTrue(bill.containsKey(ssn));
+ assertTrue(bill.containsKey(id));
+ assertEquals(2, bill.size());
+ }
+ }
+
+ // had some initial uses where I checked for Authorizations.EMPTY instead of null
+ // or empty set of auths, which caused the underlying scanner to scan at max
+ // authorizations. I want this call to explicitly say "only read data that is
+ // unlabeled"
+ @Test
+ public void testScanningWithNoAuths() {
+ try (Snapshot snapshot = client.newSnapshot()) {
+ snapshot.setScanTimeAuthorizations(Collections.emptySet());
+ assertEquals(Collections.emptySet(), snapshot.getScanTimeAuthorizations());
+ Map<Column, String> bill = snapshot.gets("bill");
+ assertFalse(bill.containsKey(name));
+ assertFalse(bill.containsKey(ssn));
+ assertTrue(bill.containsKey(id));
+ Map<Column, String> bob = snapshot.gets("bob");
+ assertFalse(bob.containsKey(name));
+ assertFalse(bob.containsKey(ssn));
+ assertTrue(bob.containsKey(id));
+ }
+
+ // create a client with config that does not have any auths set. This test the defaults when
+ // nothing was set for the client or snapshot.
+ try (FluoClient fc = FluoFactory.newClient(new FluoConfiguration(config))) {
+ try (Snapshot snapshot = fc.newSnapshot()) {
+ assertEquals(Collections.emptySet(), snapshot.getScanTimeAuthorizations());
+ Map<Column, String> bill = snapshot.gets("bill");
+ assertFalse(bill.containsKey(name));
+ assertFalse(bill.containsKey(ssn));
+ assertTrue(bill.containsKey(id));
+ Map<Column, String> bob = snapshot.gets("bob");
+ assertFalse(bob.containsKey(name));
+ assertFalse(bob.containsKey(ssn));
+ assertTrue(bob.containsKey(id));
+ }
+ }
+ }
+
+ @Test(expected = CommitException.class)
+ public void testWriteUnreadable() {
+ try (Transaction txn = client.newTransaction()) {
+ txn.set("bill", new Column("", "unreadable", "UNREADABLE"), "value");
+ txn.commit();
+ }
+ }
+}
diff --git a/modules/integration-tests/src/main/java/org/apache/fluo/integration/client/FluoClientIT.java b/modules/integration-tests/src/main/java/org/apache/fluo/integration/client/FluoClientIT.java
index da5271ca..ff8f094a 100644
--- a/modules/integration-tests/src/main/java/org/apache/fluo/integration/client/FluoClientIT.java
+++ b/modules/integration-tests/src/main/java/org/apache/fluo/integration/client/FluoClientIT.java
@@ -15,15 +15,25 @@
package org.apache.fluo.integration.client;
+import com.google.common.collect.ImmutableList;
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.security.Authorizations;
import org.apache.fluo.api.client.FluoClient;
import org.apache.fluo.api.client.FluoFactory;
+import org.apache.fluo.api.client.Snapshot;
+import org.apache.fluo.api.client.Transaction;
import org.apache.fluo.api.config.FluoConfiguration;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.exceptions.CommitException;
import org.apache.fluo.api.exceptions.FluoException;
import org.apache.fluo.core.client.FluoClientImpl;
+import org.apache.fluo.core.util.AccumuloUtil;
import org.apache.fluo.integration.ITBaseImpl;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
@@ -32,6 +42,15 @@ public class FluoClientIT extends ITBaseImpl {
@Rule
public Timeout globalTimeout = Timeout.seconds(getTestTimeout());
+
+ @Before
+ public void setupAuthorizations() throws Throwable {
+ try (AccumuloClient accumulo = AccumuloUtil.getClient(config)) {
+ accumulo.securityOperations().changeUserAuthorizations(config.getAccumuloUser(),
+ new Authorizations("PRIVATE", "PUBLIC"));
+ }
+ }
+
@Test
public void testBasic() {
try (FluoClient client = FluoFactory.newClient(config)) {
@@ -83,4 +102,14 @@ public class FluoClientIT extends ITBaseImpl {
Logger.getLogger(FluoClientImpl.class).setLevel(clientLevel);
Logger.getLogger(FluoFactory.class).setLevel(factoryLevel);
}
+
+ @Test(expected = CommitException.class)
+ public void testWriteWithDefaultAuths() throws Throwable {
+ Column labeledColumn = new Column("data", "private_column", "PRIVATE");
+ try (FluoClient client = FluoFactory.newClient(config);
+ Transaction txn = client.newTransaction()) {
+ txn.set("bill", labeledColumn, "a value");
+ txn.commit();
+ }
+ }
}