You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@fluo.apache.org by kt...@apache.org on 2023/01/15 04:08:15 UTC

[fluo] branch main updated: Adds scan time authorizations to transactions/snapshots (#1120)

This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluo.git


The following commit(s) were added to refs/heads/main by this push:
     new 4bdb9389 Adds scan time authorizations to transactions/snapshots (#1120)
4bdb9389 is described below

commit 4bdb938939801c531589281751677c25cbfd0565
Author: Bill S <wj...@users.noreply.github.com>
AuthorDate: Sat Jan 14 23:08:09 2023 -0500

    Adds scan time authorizations to transactions/snapshots (#1120)
    
    1. Allows a user to specify a collection of authorizations/visibility labels
       to use when scanning data. Writes are allowed so long as the current
       connection's authorizations satisfy any visibility expressions.
    2. Connection-wide authorizations are set in the fluo configuration and must
       be set on the underlying Accumulo user.
    
    
    Co-authored-by: Bill Slacum <bi...@glidedog.com>
---
 .../java/org/apache/fluo/api/client/Snapshot.java  |   2 +
 .../org/apache/fluo/api/client/SnapshotBase.java   |  20 +++
 .../apache/fluo/api/config/FluoConfiguration.java  |  23 +++
 .../fluo/api/config/SimpleConfiguration.java       |  28 ++++
 .../java/org/apache/fluo/command/ScanTest.java     |   4 +-
 .../apache/fluo/core/client/FluoClientImpl.java    |   3 +
 .../org/apache/fluo/core/impl/Environment.java     |  11 ++
 .../fluo/core/impl/ParallelSnapshotScanner.java    |  29 +++-
 .../org/apache/fluo/core/impl/SnapshotScanner.java |  14 +-
 .../org/apache/fluo/core/impl/TransactionImpl.java |  56 +++++--
 .../fluo/core/impl/scanner/ScannerBuilderImpl.java |  22 ++-
 .../client/FluoClientAuthorizationsIT.java         | 170 +++++++++++++++++++++
 .../fluo/integration/client/FluoClientIT.java      |  29 ++++
 13 files changed, 387 insertions(+), 24 deletions(-)

diff --git a/modules/api/src/main/java/org/apache/fluo/api/client/Snapshot.java b/modules/api/src/main/java/org/apache/fluo/api/client/Snapshot.java
index 625cf626..824a14e2 100644
--- a/modules/api/src/main/java/org/apache/fluo/api/client/Snapshot.java
+++ b/modules/api/src/main/java/org/apache/fluo/api/client/Snapshot.java
@@ -15,6 +15,8 @@
 
 package org.apache.fluo.api.client;
 
+import java.util.Collection;
+
 /**
  * Allows users to read from a Fluo table at a certain point in time. Snapshot extends
  * {@link SnapshotBase} to include a {@link #close} method which must be called when you are
diff --git a/modules/api/src/main/java/org/apache/fluo/api/client/SnapshotBase.java b/modules/api/src/main/java/org/apache/fluo/api/client/SnapshotBase.java
index 7c4a2269..ff0a694c 100644
--- a/modules/api/src/main/java/org/apache/fluo/api/client/SnapshotBase.java
+++ b/modules/api/src/main/java/org/apache/fluo/api/client/SnapshotBase.java
@@ -203,4 +203,24 @@ public interface SnapshotBase {
   default CompletableFuture<Bytes> getAsync(Bytes row, Column column, Bytes defaultValue) {
     return CompletableFuture.completedFuture(get(row, column, defaultValue));
   }
+
+  /**
+   * All reads done using this snapshot after this call will use the passed in authorizations to
+   * filter data.
+   *
+   * @since 2.0.0
+   */
+  default void setScanTimeAuthorizations(Collection<String> authorizations) {
+    throw new UnsupportedOperationException();
+  }
+
+  /**
+   * Returns the set of scan time authorization that are currently in use for filtering data. The
+   * empty set indicates no filtering is being done using scan time authorizations.
+   *
+   * @since 2.0.0
+   */
+  default Collection<String> getScanTimeAuthorizations() {
+    throw new UnsupportedOperationException();
+  }
 }
diff --git a/modules/api/src/main/java/org/apache/fluo/api/config/FluoConfiguration.java b/modules/api/src/main/java/org/apache/fluo/api/config/FluoConfiguration.java
index 3cf944b5..b18b21c3 100644
--- a/modules/api/src/main/java/org/apache/fluo/api/config/FluoConfiguration.java
+++ b/modules/api/src/main/java/org/apache/fluo/api/config/FluoConfiguration.java
@@ -151,6 +151,9 @@ public class FluoConfiguration extends SimpleConfiguration {
    * @since 1.2.0
    */
   public static final String ACCUMULO_USER_PROP = ACCUMULO_PREFIX + ".user";
+
+  public static final String ACCUMULO_AUTH_PROP = ACCUMULO_PREFIX + ".auths";
+
   /**
    * @since 1.2.0
    */
@@ -486,6 +489,7 @@ public class FluoConfiguration extends SimpleConfiguration {
     return getDepNonEmptyString(ACCUMULO_USER_PROP, CLIENT_ACCUMULO_USER_PROP);
   }
 
+
   /**
    * Sets the Apache Accumulo password property {@value #ACCUMULO_PASSWORD_PROP}
    *
@@ -510,6 +514,25 @@ public class FluoConfiguration extends SimpleConfiguration {
     throw new NoSuchElementException(ACCUMULO_PASSWORD_PROP + " is not set!");
   }
 
+  /**
+   * @since 2.0.0
+   */
+  public FluoConfiguration setAccumuloAuthorizations(String... auths) {
+    setProperties(ACCUMULO_AUTH_PROP, auths);
+    return this;
+  }
+
+  /**
+   * @since 2.0.0
+   */
+  public String[] getAccumuloAuthorizations() {
+    if (containsKey(ACCUMULO_AUTH_PROP)) {
+      return this.getProperties(ACCUMULO_AUTH_PROP);
+    } else {
+      return new String[0];
+    }
+  }
+
   /**
    * Sets the value of the property {@value #ACCUMULO_ZOOKEEPERS_PROP}
    *
diff --git a/modules/api/src/main/java/org/apache/fluo/api/config/SimpleConfiguration.java b/modules/api/src/main/java/org/apache/fluo/api/config/SimpleConfiguration.java
index b48dd835..88aebb0b 100644
--- a/modules/api/src/main/java/org/apache/fluo/api/config/SimpleConfiguration.java
+++ b/modules/api/src/main/java/org/apache/fluo/api/config/SimpleConfiguration.java
@@ -30,6 +30,7 @@ import java.io.StringReader;
 import java.io.UncheckedIOException;
 import java.io.Writer;
 import java.nio.file.Files;
+import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -242,6 +243,33 @@ public class SimpleConfiguration implements Serializable {
     internalConfig.setProperty(key, value);
   }
 
+  /**
+   * @since 2.0.0
+   */
+  public void setProperties(String key, String... values) {
+    Objects.requireNonNull(values, "Values for key `" + key + "` must be non-null.");
+    // don't let callers modify the array underneath of us
+    String[] copy = new String[values.length];
+    System.arraycopy(values, 0, copy, 0, copy.length);
+    for (String value : copy) {
+      Objects.requireNonNull(value, "Encountered null value for key `" + key + "`.");
+    }
+    internalConfig.setProperty(key, values);
+  }
+
+  /**
+   * @since 2.0.0
+   */
+  public String[] getProperties(String key) {
+    // TODO fix cast class; use Properties?
+    ArrayList<String> values = (ArrayList<String>) internalConfig.getProperty(key);
+    if (values == null) {
+      return new String[0];
+    } else {
+      return values.toArray(new String[values.size()]);
+    }
+  }
+
   /**
    * Returns a subset of config that start with given prefix. The prefix will not be present in keys
    * of the returned config. Any changes made to the returned config will be made to this and visa
diff --git a/modules/command/src/test/java/org/apache/fluo/command/ScanTest.java b/modules/command/src/test/java/org/apache/fluo/command/ScanTest.java
index fabc3aaf..f7f1dd3f 100644
--- a/modules/command/src/test/java/org/apache/fluo/command/ScanTest.java
+++ b/modules/command/src/test/java/org/apache/fluo/command/ScanTest.java
@@ -16,6 +16,7 @@
 package org.apache.fluo.command;
 
 import com.beust.jcommander.JCommander;
+import org.apache.accumulo.core.security.Authorizations;
 import org.apache.fluo.api.data.Column;
 import org.apache.fluo.api.data.RowColumn;
 import org.apache.fluo.api.data.Span;
@@ -34,7 +35,8 @@ public class ScanTest {
     JCommander jcommand = new JCommander(scan);
     jcommand.parse(args.split(" "));
     ScanUtil.ScanOpts opts = scan.getScanOpts();
-    return new SnapshotScanner.Opts(ScanUtil.getSpan(opts), ScanUtil.getColumns(opts), false);
+    return new SnapshotScanner.Opts(ScanUtil.getSpan(opts), ScanUtil.getColumns(opts), false,
+        Authorizations.EMPTY);
   }
 
   @Test
diff --git a/modules/core/src/main/java/org/apache/fluo/core/client/FluoClientImpl.java b/modules/core/src/main/java/org/apache/fluo/core/client/FluoClientImpl.java
index 22749900..797f7ffb 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/client/FluoClientImpl.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/client/FluoClientImpl.java
@@ -15,11 +15,14 @@
 
 package org.apache.fluo.core.client;
 
+import java.util.Collection;
 import java.util.Objects;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
+import org.apache.accumulo.core.security.Authorizations;
 import org.apache.fluo.api.client.FluoClient;
 import org.apache.fluo.api.client.LoaderExecutor;
 import org.apache.fluo.api.client.Snapshot;
diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/Environment.java b/modules/core/src/main/java/org/apache/fluo/core/impl/Environment.java
index f9a23162..9c35743e 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/impl/Environment.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/Environment.java
@@ -18,10 +18,14 @@ package org.apache.fluo.core.impl;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.nio.charset.StandardCharsets;
+import java.util.List;
 import java.util.Map.Entry;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
 import org.apache.accumulo.core.client.AccumuloClient;
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.TableNotFoundException;
@@ -102,6 +106,13 @@ public class Environment implements AutoCloseable {
           + client.instanceOperations().getInstanceId() + " != " + accumuloInstanceID);
     }
 
+    String[] auths = config.getAccumuloAuthorizations();
+    if (auths.length == 0) {
+      this.auths = new Authorizations();
+    } else {
+      this.auths = new Authorizations(auths);
+    }
+
     try {
       resources = new SharedResources(this);
     } catch (TableNotFoundException e1) {
diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/ParallelSnapshotScanner.java b/modules/core/src/main/java/org/apache/fluo/core/impl/ParallelSnapshotScanner.java
index 4d2b37ce..17f4620f 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/impl/ParallelSnapshotScanner.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/ParallelSnapshotScanner.java
@@ -33,6 +33,7 @@ import org.apache.accumulo.core.data.ByteSequence;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
 import org.apache.fluo.accumulo.util.ColumnType;
 import org.apache.fluo.api.data.Bytes;
 import org.apache.fluo.api.data.Column;
@@ -56,9 +57,11 @@ public class ParallelSnapshotScanner {
   private Map<Bytes, Set<Column>> readLocksSeen;
   private Consumer<Entry<Key, Value>> writeLocksSeen;
 
+  private Authorizations authorizations;
+
   ParallelSnapshotScanner(Collection<Bytes> rows, Set<Column> columns, Environment env,
       long startTs, TxStats stats, Map<Bytes, Set<Column>> readLocksSeen,
-      Consumer<Entry<Key, Value>> writeLocksSeen) {
+      Consumer<Entry<Key, Value>> writeLocksSeen, Authorizations authorizations) {
     this.rows = rows;
     this.columns = columns;
     this.env = env;
@@ -68,19 +71,33 @@ public class ParallelSnapshotScanner {
     this.columnConverter = new CachedColumnConverter(columns);
     this.readLocksSeen = readLocksSeen;
     this.writeLocksSeen = writeLocksSeen;
+    this.authorizations = authorizations;
+  }
+
+  ParallelSnapshotScanner(Collection<Bytes> rows, Set<Column> columns, Environment env,
+      long startTs, TxStats stats, Map<Bytes, Set<Column>> readLocksSeen,
+      Consumer<Entry<Key, Value>> writeLocksSeen) {
+    this(rows, columns, env, startTs, stats, readLocksSeen, writeLocksSeen,
+        env.getAuthorizations());
   }
 
   ParallelSnapshotScanner(Collection<RowColumn> cells, Environment env, long startTs, TxStats stats,
       Map<Bytes, Set<Column>> readLocksSeen, Consumer<Entry<Key, Value>> writeLocksSeen) {
+    this(cells, env, startTs, stats, readLocksSeen, writeLocksSeen, env.getAuthorizations());
+  }
+
+  ParallelSnapshotScanner(Collection<RowColumn> cells, Environment env, long startTs, TxStats stats,
+      Map<Bytes, Set<Column>> readLocksSeen, Consumer<Entry<Key, Value>> writeLocksSeen,
+      Authorizations authorizations) {
     for (RowColumn rc : cells) {
       byte[] r = rc.getRow().toArray();
       byte[] cf = rc.getColumn().getFamily().toArray();
       byte[] cq = rc.getColumn().getQualifier().toArray();
-      byte[] cv = rc.getColumn().getVisibility().toArray();
+      byte[] cv = new byte[0];
+      byte[] cv2 = new byte[] {(byte) 0xff};
 
       Key start = new Key(r, cf, cq, cv, Long.MAX_VALUE, false, false);
-      Key end = new Key(start);
-      end.setTimestamp(Long.MIN_VALUE);
+      Key end = new Key(r, cf, cq, cv2, Long.MIN_VALUE, false, false);
 
       rangesToScan.add(new Range(start, true, end, true));
     }
@@ -92,6 +109,7 @@ public class ParallelSnapshotScanner {
     this.columnConverter = ColumnUtil::convert;
     this.readLocksSeen = readLocksSeen;
     this.writeLocksSeen = writeLocksSeen;
+    this.authorizations = authorizations;
   }
 
   private BatchScanner setupBatchScanner() {
@@ -100,8 +118,7 @@ public class ParallelSnapshotScanner {
     try {
       // TODO hardcoded number of threads!
       // one thread is probably good.. going for throughput
-      scanner =
-          env.getAccumuloClient().createBatchScanner(env.getTable(), env.getAuthorizations(), 1);
+      scanner = env.getAccumuloClient().createBatchScanner(env.getTable(), this.authorizations, 1);
     } catch (TableNotFoundException e) {
       throw new RuntimeException(e);
     }
diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/SnapshotScanner.java b/modules/core/src/main/java/org/apache/fluo/core/impl/SnapshotScanner.java
index 3defc897..71c46d27 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/impl/SnapshotScanner.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/SnapshotScanner.java
@@ -29,6 +29,7 @@ import org.apache.accumulo.core.client.ScannerBase;
 import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
 import org.apache.fluo.accumulo.iterators.SnapshotIterator;
 import org.apache.fluo.accumulo.util.ColumnType;
 import org.apache.fluo.api.data.Column;
@@ -51,10 +52,14 @@ public class SnapshotScanner implements Iterable<Entry<Key, Value>> {
     private final Collection<Column> columns;
     private final boolean showReadLocks;
 
-    public Opts(Span span, Collection<Column> columns, boolean showReadLocks) {
+    private final Authorizations scanTimeAuthz;
+
+    public Opts(Span span, Collection<Column> columns, boolean showReadLocks,
+        Authorizations scanTimeAuthz) {
       this.span = span;
       this.columns = ImmutableSet.copyOf(columns);
       this.showReadLocks = showReadLocks;
+      this.scanTimeAuthz = scanTimeAuthz;
     }
 
     public Span getSpan() {
@@ -111,7 +116,9 @@ public class SnapshotScanner implements Iterable<Entry<Key, Value>> {
     private void setUpIterator() {
       Scanner scanner;
       try {
-        scanner = env.getAccumuloClient().createScanner(env.getTable(), env.getAuthorizations());
+        scanner = env.getAccumuloClient().createScanner(env.getTable(),
+            snapIterConfig.scanTimeAuthz == null ? env.getAuthorizations()
+                : snapIterConfig.scanTimeAuthz);
       } catch (TableNotFoundException e) {
         throw new RuntimeException(e);
       }
@@ -145,7 +152,8 @@ public class SnapshotScanner implements Iterable<Entry<Key, Value>> {
     }
 
     private void resetScanner(Span span) {
-      snapIterConfig = new Opts(span, snapIterConfig.columns, snapIterConfig.showReadLocks);
+      snapIterConfig = new Opts(span, snapIterConfig.columns, snapIterConfig.showReadLocks,
+          snapIterConfig.scanTimeAuthz);
       setUpIterator();
     }
 
diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java b/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java
index 8c32aec3..6b6f37fe 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java
@@ -15,6 +15,7 @@
 
 package org.apache.fluo.core.impl;
 
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -31,6 +32,7 @@ import java.util.concurrent.CompletionException;
 import java.util.concurrent.Executor;
 import java.util.function.Consumer;
 import java.util.function.Supplier;
+import java.util.stream.Collectors;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
@@ -50,6 +52,7 @@ import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
 import org.apache.fluo.accumulo.iterators.PrewriteIterator;
 import org.apache.fluo.accumulo.util.ColumnConstants;
 import org.apache.fluo.accumulo.util.ColumnType;
@@ -140,11 +143,14 @@ public class TransactionImpl extends AbstractTransactionBase implements AsyncTra
   private boolean commitAttempted = false;
   private AsyncReader asyncReader = null;
 
+  private Authorizations scanTimeAuthz;
 
-  public TransactionImpl(Environment env, Notification trigger, long startTs) {
+  public TransactionImpl(Environment env, Notification trigger, long startTs,
+      Authorizations scanTimeAuthz) {
     Objects.requireNonNull(env, "environment cannot be null");
     Preconditions.checkArgument(startTs >= 0, "startTs cannot be negative");
     this.env = env;
+    this.scanTimeAuthz = Objects.requireNonNull(scanTimeAuthz);
     this.stats = new TxStats(env);
     this.startTs = startTs;
     this.observedColumns = env.getConfiguredObservers().getObservedColumns(STRONG);
@@ -164,15 +170,19 @@ public class TransactionImpl extends AbstractTransactionBase implements AsyncTra
   }
 
   public TransactionImpl(Environment env, Notification trigger) {
-    this(env, trigger, allocateTimestamp(env).getTxTimestamp());
+    this(env, trigger, allocateTimestamp(env).getTxTimestamp(), env.getAuthorizations());
   }
 
   public TransactionImpl(Environment env) {
-    this(env, null, allocateTimestamp(env).getTxTimestamp());
+    this(env, null, allocateTimestamp(env).getTxTimestamp(), env.getAuthorizations());
   }
 
   public TransactionImpl(Environment env, long startTs) {
-    this(env, null, startTs);
+    this(env, null, startTs, env.getAuthorizations());
+  }
+
+  public TransactionImpl(Environment env, Authorizations scanTimeAuthz) {
+    this(env, null, allocateTimestamp(env).getTxTimestamp(), scanTimeAuthz);
   }
 
   private static Stamp allocateTimestamp(Environment env) {
@@ -204,7 +214,7 @@ public class TransactionImpl extends AbstractTransactionBase implements AsyncTra
 
     ParallelSnapshotScanner pss =
         new ParallelSnapshotScanner(rows, columns, env, startTs, stats, readLocksSeen, kve -> {
-        });
+        }, this.scanTimeAuthz);
 
     Map<Bytes, Map<Column, Bytes>> ret = pss.scan();
 
@@ -246,9 +256,9 @@ public class TransactionImpl extends AbstractTransactionBase implements AsyncTra
           cols.add(column);
         }
       }
-      opts = new SnapshotScanner.Opts(Span.exact(row), columns, true);
+      opts = new SnapshotScanner.Opts(Span.exact(row), columns, true, this.scanTimeAuthz);
     } else {
-      opts = new SnapshotScanner.Opts(Span.exact(row), columns, true);
+      opts = new SnapshotScanner.Opts(Span.exact(row), columns, true, this.scanTimeAuthz);
     }
 
     Map<Column, Bytes> ret = new HashMap<>();
@@ -283,8 +293,8 @@ public class TransactionImpl extends AbstractTransactionBase implements AsyncTra
       return Collections.emptyMap();
     }
 
-    ParallelSnapshotScanner pss =
-        new ParallelSnapshotScanner(rowColumns, env, startTs, stats, readLocksSeen, writeLocksSeen);
+    ParallelSnapshotScanner pss = new ParallelSnapshotScanner(rowColumns, env, startTs, stats,
+        readLocksSeen, writeLocksSeen, this.scanTimeAuthz);
 
     Map<Bytes, Map<Column, Bytes>> scan = pss.scan();
     Map<RowColumn, Bytes> ret = new HashMap<>();
@@ -329,7 +339,7 @@ public class TransactionImpl extends AbstractTransactionBase implements AsyncTra
   @Override
   public ScannerBuilder scanner() {
     checkIfOpen();
-    return new ScannerBuilderImpl(this);
+    return new ScannerBuilderImpl(this, this.scanTimeAuthz);
   }
 
   private void updateColumnsRead(Bytes row, Set<Column> columns) {
@@ -1386,6 +1396,7 @@ public class TransactionImpl extends AbstractTransactionBase implements AsyncTra
 
     @Override
     public Collection<Mutation> createMutations(CommitData cd) {
+
       long commitTs = getStats().getCommitTs();
       ArrayList<Mutation> mutations = new ArrayList<>(updates.size() + 1);
       for (Entry<Bytes, Map<Column, Bytes>> rowUpdates : updates.entrySet()) {
@@ -1569,7 +1580,28 @@ public class TransactionImpl extends AbstractTransactionBase implements AsyncTra
     });
   }
 
-  public SnapshotScanner newSnapshotScanner(Span span, Collection<Column> columns) {
-    return new SnapshotScanner(env, new SnapshotScanner.Opts(span, columns, false), startTs, stats);
+  public SnapshotScanner newSnapshotScanner(Span span, Collection<Column> columns,
+      Authorizations scanTimeAuthz) {
+    return new SnapshotScanner(env, new SnapshotScanner.Opts(span, columns, false, scanTimeAuthz),
+        startTs, stats);
+  }
+
+  @Override
+  public void setScanTimeAuthorizations(Collection<String> labels) {
+    Objects.requireNonNull(labels, "Authorization tokens must not be null!");
+    String[] requestedAuthz = Iterables.toArray(labels, String.class);
+    if (requestedAuthz != null) {
+      if (requestedAuthz.length == 0) {
+        this.scanTimeAuthz = Authorizations.EMPTY;
+      } else {
+        this.scanTimeAuthz = new Authorizations(requestedAuthz);
+      }
+    }
+  }
+
+  @Override
+  public Collection<String> getScanTimeAuthorizations() {
+    return this.scanTimeAuthz.getAuthorizations().stream()
+        .map(auth -> new String(auth, StandardCharsets.UTF_8)).collect(Collectors.toSet());
   }
 }
diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/scanner/ScannerBuilderImpl.java b/modules/core/src/main/java/org/apache/fluo/core/impl/scanner/ScannerBuilderImpl.java
index 420fb738..b0f4e163 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/impl/scanner/ScannerBuilderImpl.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/scanner/ScannerBuilderImpl.java
@@ -21,6 +21,7 @@ import java.util.Objects;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableSet;
+import org.apache.accumulo.core.security.Authorizations;
 import org.apache.fluo.api.client.scanner.CellScanner;
 import org.apache.fluo.api.client.scanner.RowScannerBuilder;
 import org.apache.fluo.api.client.scanner.ScannerBuilder;
@@ -37,10 +38,17 @@ public class ScannerBuilderImpl implements ScannerBuilder {
   private Span span = EMPTY_SPAN;
   private Collection<Column> columns = Collections.emptyList();
 
+  private Authorizations scanTimeAuthz = Authorizations.EMPTY;
+
   public ScannerBuilderImpl(TransactionImpl tx) {
     this.tx = tx;
   }
 
+  public ScannerBuilderImpl(TransactionImpl tx, Authorizations scanTimeAuthz) {
+    this.tx = tx;
+    this.scanTimeAuthz = scanTimeAuthz;
+  }
+
   @Override
   public ScannerBuilder over(Span span) {
     Objects.requireNonNull(span);
@@ -70,16 +78,26 @@ public class ScannerBuilderImpl implements ScannerBuilder {
     return this;
   }
 
+  public ScannerBuilder withLabels(Collection<String> authLables) {
+    Objects.requireNonNull(authLables);
+    if (authLables.isEmpty()) {
+      this.scanTimeAuthz = Authorizations.EMPTY;
+    } else {
+      this.scanTimeAuthz = new Authorizations(authLables.toArray(new String[authLables.size()]));
+    }
+    return this;
+  }
+
   @Override
   public CellScanner build() {
-    SnapshotScanner snapScanner = tx.newSnapshotScanner(span, columns);
+    SnapshotScanner snapScanner = tx.newSnapshotScanner(span, columns, scanTimeAuthz);
     return new CellScannerImpl(snapScanner, columns);
   }
 
   @Override
   public RowScannerBuilder byRow() {
     return () -> {
-      SnapshotScanner snapScanner = tx.newSnapshotScanner(span, columns);
+      SnapshotScanner snapScanner = tx.newSnapshotScanner(span, columns, scanTimeAuthz);
       return new RowScannerImpl(snapScanner, columns);
     };
   }
diff --git a/modules/integration-tests/src/main/java/org/apache/fluo/integration/client/FluoClientAuthorizationsIT.java b/modules/integration-tests/src/main/java/org/apache/fluo/integration/client/FluoClientAuthorizationsIT.java
new file mode 100644
index 00000000..ce37dbbf
--- /dev/null
+++ b/modules/integration-tests/src/main/java/org/apache/fluo/integration/client/FluoClientAuthorizationsIT.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.integration.client;
+
+import java.util.Collections;
+import java.util.Map;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.client.FluoFactory;
+import org.apache.fluo.api.client.Snapshot;
+import org.apache.fluo.api.client.Transaction;
+import org.apache.fluo.api.config.FluoConfiguration;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.exceptions.CommitException;
+import org.apache.fluo.core.util.AccumuloUtil;
+import org.apache.fluo.integration.ITBaseImpl;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class FluoClientAuthorizationsIT extends ITBaseImpl {
+  @Rule
+  public Timeout globalTimeout = Timeout.seconds(getTestTimeout());
+
+  Column ssn = new Column("", "ssn", "PRIVATE");
+  Column name = new Column("", "name", "PUBLIC");
+  Column id = new Column("", "id");
+  FluoConfiguration conf;
+  FluoClient client;
+
+  @Before
+  public void setupAuthorizations() throws Throwable {
+    try (AccumuloClient accumulo = AccumuloUtil.getClient(config)) {
+      accumulo.securityOperations().changeUserAuthorizations(config.getAccumuloUser(),
+          new Authorizations("PRIVATE", "PUBLIC"));
+    }
+    this.conf = new FluoConfiguration(config);
+    this.conf.setAccumuloAuthorizations("PRIVATE", "PUBLIC");
+    this.client = FluoFactory.newClient(this.conf);
+
+    writeSampleData();
+  }
+
+  /*
+   * Kind of sloppy because we assert some basic write functionality already works. However, this
+   * lets us re-use and organize tests that read this data better.
+   */
+  public void writeSampleData() {
+    try (Transaction txn = client.newTransaction()) {
+      txn.set("bill", ssn, "000-00-0001");
+      txn.set("bill", name, "william");
+      txn.set("bill", id, "1");
+      txn.set("bob", ssn, "000-00-0002");
+      txn.set("bob", name, "robert");
+      txn.set("bob", id, "2");
+      txn.commit();
+    }
+  }
+
+  @After
+  public void cleanupClient() throws Throwable {
+    this.client.close();
+  }
+
+  @Test
+  public void testBasicRead() {
+    try (Snapshot snapshot = client.newSnapshot()) {
+      assertEquals(ImmutableSet.of("PUBLIC", "PRIVATE"), snapshot.getScanTimeAuthorizations());
+      Map<Column, String> bill = snapshot.gets("bill");
+      assertTrue(bill.containsKey(name));
+      assertTrue(bill.containsKey(ssn));
+      assertTrue(bill.containsKey(id));
+      Map<Column, String> bob = snapshot.gets("bill");
+      assertTrue(bob.containsKey(name));
+      assertTrue(bob.containsKey(ssn));
+      assertTrue(bob.containsKey(id));
+    }
+  }
+
+  @Test
+  public void testPublicRead() {
+    try (Snapshot snapshot = client.newSnapshot()) {
+      snapshot.setScanTimeAuthorizations(ImmutableList.of("PUBLIC"));
+      assertEquals(ImmutableSet.of("PUBLIC"), snapshot.getScanTimeAuthorizations());
+      Map<Column, String> bill = snapshot.gets("bill");
+      assertTrue(bill.containsKey(name));
+      assertTrue(bill.containsKey(id));
+      assertEquals(2, bill.size());
+    }
+  }
+
+  @Test
+  public void testPrivateRead() {
+    try (Snapshot snapshot = client.newSnapshot()) {
+      snapshot.setScanTimeAuthorizations(ImmutableList.of("PRIVATE"));
+      assertEquals(ImmutableSet.of("PRIVATE"), snapshot.getScanTimeAuthorizations());
+      Map<Column, String> bill = snapshot.gets("bill");
+      assertTrue(bill.containsKey(ssn));
+      assertTrue(bill.containsKey(id));
+      assertEquals(2, bill.size());
+    }
+  }
+
+  // had some initial uses where I checked for Authorizations.EMPTY instead of null
+  // or empty set of auths, which caused the underlying scanner to scan at max
+  // authorizations. I want this call to explicitly say "only read data that is
+  // unlabeled"
+  @Test
+  public void testScanningWithNoAuths() {
+    try (Snapshot snapshot = client.newSnapshot()) {
+      snapshot.setScanTimeAuthorizations(Collections.emptySet());
+      assertEquals(Collections.emptySet(), snapshot.getScanTimeAuthorizations());
+      Map<Column, String> bill = snapshot.gets("bill");
+      assertFalse(bill.containsKey(name));
+      assertFalse(bill.containsKey(ssn));
+      assertTrue(bill.containsKey(id));
+      Map<Column, String> bob = snapshot.gets("bob");
+      assertFalse(bob.containsKey(name));
+      assertFalse(bob.containsKey(ssn));
+      assertTrue(bob.containsKey(id));
+    }
+
+    // create a client with config that does not have any auths set. This test the defaults when
+    // nothing was set for the client or snapshot.
+    try (FluoClient fc = FluoFactory.newClient(new FluoConfiguration(config))) {
+      try (Snapshot snapshot = fc.newSnapshot()) {
+        assertEquals(Collections.emptySet(), snapshot.getScanTimeAuthorizations());
+        Map<Column, String> bill = snapshot.gets("bill");
+        assertFalse(bill.containsKey(name));
+        assertFalse(bill.containsKey(ssn));
+        assertTrue(bill.containsKey(id));
+        Map<Column, String> bob = snapshot.gets("bob");
+        assertFalse(bob.containsKey(name));
+        assertFalse(bob.containsKey(ssn));
+        assertTrue(bob.containsKey(id));
+      }
+    }
+  }
+
+  @Test(expected = CommitException.class)
+  public void testWriteUnreadable() {
+    try (Transaction txn = client.newTransaction()) {
+      txn.set("bill", new Column("", "unreadable", "UNREADABLE"), "value");
+      txn.commit();
+    }
+  }
+}
diff --git a/modules/integration-tests/src/main/java/org/apache/fluo/integration/client/FluoClientIT.java b/modules/integration-tests/src/main/java/org/apache/fluo/integration/client/FluoClientIT.java
index da5271ca..ff8f094a 100644
--- a/modules/integration-tests/src/main/java/org/apache/fluo/integration/client/FluoClientIT.java
+++ b/modules/integration-tests/src/main/java/org/apache/fluo/integration/client/FluoClientIT.java
@@ -15,15 +15,25 @@
 
 package org.apache.fluo.integration.client;
 
+import com.google.common.collect.ImmutableList;
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.security.Authorizations;
 import org.apache.fluo.api.client.FluoClient;
 import org.apache.fluo.api.client.FluoFactory;
+import org.apache.fluo.api.client.Snapshot;
+import org.apache.fluo.api.client.Transaction;
 import org.apache.fluo.api.config.FluoConfiguration;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.exceptions.CommitException;
 import org.apache.fluo.api.exceptions.FluoException;
 import org.apache.fluo.core.client.FluoClientImpl;
+import org.apache.fluo.core.util.AccumuloUtil;
 import org.apache.fluo.integration.ITBaseImpl;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.Timeout;
@@ -32,6 +42,15 @@ public class FluoClientIT extends ITBaseImpl {
   @Rule
   public Timeout globalTimeout = Timeout.seconds(getTestTimeout());
 
+
+  @Before
+  public void setupAuthorizations() throws Throwable {
+    try (AccumuloClient accumulo = AccumuloUtil.getClient(config)) {
+      accumulo.securityOperations().changeUserAuthorizations(config.getAccumuloUser(),
+          new Authorizations("PRIVATE", "PUBLIC"));
+    }
+  }
+
   @Test
   public void testBasic() {
     try (FluoClient client = FluoFactory.newClient(config)) {
@@ -83,4 +102,14 @@ public class FluoClientIT extends ITBaseImpl {
     Logger.getLogger(FluoClientImpl.class).setLevel(clientLevel);
     Logger.getLogger(FluoFactory.class).setLevel(factoryLevel);
   }
+
+  @Test(expected = CommitException.class)
+  public void testWriteWithDefaultAuths() throws Throwable {
+    Column labeledColumn = new Column("data", "private_column", "PRIVATE");
+    try (FluoClient client = FluoFactory.newClient(config);
+        Transaction txn = client.newTransaction()) {
+      txn.set("bill", labeledColumn, "a value");
+      txn.commit();
+    }
+  }
 }