You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2015/08/18 07:50:10 UTC
accumulo git commit: ACCUMULO-3961 Introduce CfCqColumnSliceFilter
and CfCqSliceSeekingFilter.
Repository: accumulo
Updated Branches:
refs/heads/master 681890322 -> 89f44e24f
ACCUMULO-3961 Introduce CfCqColumnSliceFilter and CfCqSliceSeekingFilter.
Signed-off-by: Josh Elser <el...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/89f44e24
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/89f44e24
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/89f44e24
Branch: refs/heads/master
Commit: 89f44e24f711dfc4bc61fb1d20b98dda5518a236
Parents: 6818903
Author: Russ Weeks <rw...@phemi.com>
Authored: Wed Jul 22 20:53:01 2015 -0700
Committer: Josh Elser <el...@apache.org>
Committed: Tue Aug 18 01:49:32 2015 -0400
----------------------------------------------------------------------
.../core/iterators/user/CfCqSliceFilter.java | 99 +++++
.../core/iterators/user/CfCqSliceOpts.java | 125 ++++++
.../iterators/user/CfCqSliceSeekingFilter.java | 134 ++++++
.../core/iterators/user/SeekingFilter.java | 220 ++++++++++
.../core/iterators/user/TestCfCqSlice.java | 415 +++++++++++++++++++
.../iterators/user/TestCfCqSliceFilter.java | 24 ++
.../user/TestCfCqSliceSeekingFilter.java | 24 ++
7 files changed, 1041 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/89f44e24/core/src/main/java/org/apache/accumulo/core/iterators/user/CfCqSliceFilter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/user/CfCqSliceFilter.java b/core/src/main/java/org/apache/accumulo/core/iterators/user/CfCqSliceFilter.java
new file mode 100644
index 0000000..a323028
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/iterators/user/CfCqSliceFilter.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.iterators.user;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.PartialKey;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.Filter;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.OptionDescriber;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * Filters key/value pairs for a range of column families and a range of column qualifiers. Only keys which fall in both ranges will be passed by the filter.
+ * Note that if you have a small, well-defined set of column families it will be much more efficient to configure locality groups to isolate that data instead
+ * of configuring this iterator to scan over it.
+ *
+ * @see org.apache.accumulo.core.iterators.user.CfCqSliceOpts for a description of this iterator's options.
+ */
+public class CfCqSliceFilter extends Filter implements OptionDescriber {
+
+ private CfCqSliceOpts cso;
+
+ @Override
+ public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
+ super.init(source, options, env);
+ cso = new CfCqSliceOpts(options);
+ }
+
+ @Override
+ public boolean accept(Key k, Value v) {
+ PartialKey inSlice = isKeyInSlice(k);
+ return inSlice == PartialKey.ROW_COLFAM_COLQUAL;
+ }
+
+ @Override
+ public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
+ CfCqSliceFilter o = (CfCqSliceFilter) super.deepCopy(env);
+ o.cso = new CfCqSliceOpts(cso);
+ return o;
+ }
+
+ private PartialKey isKeyInSlice(Key k) {
+ if (cso.minCf.getLength() > 0) {
+ int minCfComp = k.compareColumnFamily(cso.minCf);
+ if (minCfComp < 0 || (minCfComp == 0 && !cso.minInclusive)) {
+ return PartialKey.ROW;
+ }
+ }
+ if (cso.maxCf.getLength() > 0) {
+ int maxCfComp = k.compareColumnFamily(cso.maxCf);
+ if (maxCfComp > 0 || (maxCfComp == 0 && !cso.maxInclusive)) {
+ return PartialKey.ROW;
+ }
+ }
+ // k.colfam is in the "slice".
+ if (cso.minCq.getLength() > 0) {
+ int minCqComp = k.compareColumnQualifier(cso.minCq);
+ if (minCqComp < 0 || (minCqComp == 0 && !cso.minInclusive)) {
+ return PartialKey.ROW_COLFAM;
+ }
+ }
+ if (cso.maxCq.getLength() > 0) {
+ int maxCqComp = k.compareColumnQualifier(cso.maxCq);
+ if (maxCqComp > 0 || (maxCqComp == 0 && !cso.maxInclusive)) {
+ return PartialKey.ROW_COLFAM;
+ }
+ }
+ // k.colqual is in the slice.
+ return PartialKey.ROW_COLFAM_COLQUAL;
+ }
+
+ @Override
+ public IteratorOptions describeOptions() {
+ return new CfCqSliceOpts.Describer().describeOptions();
+ }
+
+ @Override
+ public boolean validateOptions(Map<String,String> options) {
+ return new CfCqSliceOpts.Describer().validateOptions(options);
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/89f44e24/core/src/main/java/org/apache/accumulo/core/iterators/user/CfCqSliceOpts.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/user/CfCqSliceOpts.java b/core/src/main/java/org/apache/accumulo/core/iterators/user/CfCqSliceOpts.java
new file mode 100644
index 0000000..c70e4bd
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/iterators/user/CfCqSliceOpts.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.iterators.user;
+
+import org.apache.accumulo.core.iterators.OptionDescriber;
+import org.apache.hadoop.io.Text;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+public class CfCqSliceOpts {
+ public static final String OPT_MIN_CF = "minCf";
+ public static final String OPT_MIN_CF_DESC = "UTF-8 encoded string representing minimum column family. "
+ + "Optional parameter. If minCf and minCq are undefined, the column slice will start at the first column "
+ + "of each row. If you want to do an exact match on column families, it's more efficient to leave minCf "
+ + "and maxCf undefined and use the scanner's fetchColumnFamily method.";
+
+ public static final String OPT_MIN_CQ = "minCq";
+ public static final String OPT_MIN_CQ_DESC = "UTF-8 encoded string representing minimum column qualifier. "
+ + "Optional parameter. If minCf and minCq are undefined, the column slice will start at the first column " + "of each row.";
+
+ public static final String OPT_MAX_CF = "maxCf";
+ public static final String OPT_MAX_CF_DESC = "UTF-8 encoded string representing maximum column family. "
+ + "Optional parameter. If minCf and minCq are undefined, the column slice will start at the first column "
+ + "of each row. If you want to do an exact match on column families, it's more efficient to leave minCf "
+ + "and maxCf undefined and use the scanner's fetchColumnFamily method.";
+
+ public static final String OPT_MAX_CQ = "maxCq";
+ public static final String OPT_MAX_CQ_DESC = "UTF-8 encoded string representing maximum column qualifier. "
+ + "Optional parameter. If maxCf and MaxCq are undefined, the column slice will end at the last column of " + "each row.";
+
+ public static final String OPT_MIN_INCLUSIVE = "minInclusive";
+ public static final String OPT_MIN_INCLUSIVE_DESC = "UTF-8 encoded string indicating whether to include the "
+ + "minimum column in the slice range. Optional parameter, default is true.";
+
+ public static final String OPT_MAX_INCLUSIVE = "maxInclusive";
+ public static final String OPT_MAX_INCLUSIVE_DESC = "UTF-8 encoded string indicating whether to include the "
+ + "maximum column in the slice range. Optional parameter, default is true.";
+
+ Text minCf;
+ Text minCq;
+
+ Text maxCf;
+ Text maxCq;
+
+ boolean minInclusive;
+ boolean maxInclusive;
+
+ public CfCqSliceOpts(CfCqSliceOpts o) {
+ minCf = new Text(o.minCf);
+ minCq = new Text(o.minCq);
+ maxCf = new Text(o.maxCf);
+ maxCq = new Text(o.maxCq);
+ minInclusive = o.minInclusive;
+ maxInclusive = o.maxInclusive;
+ }
+
+ public CfCqSliceOpts(Map<String,String> options) {
+ String optStr = options.get(OPT_MIN_CF);
+ minCf = optStr == null ? new Text() : new Text(optStr.getBytes(UTF_8));
+
+ optStr = options.get(OPT_MIN_CQ);
+ minCq = optStr == null ? new Text() : new Text(optStr.getBytes(UTF_8));
+
+ optStr = options.get(OPT_MAX_CF);
+ maxCf = optStr == null ? new Text() : new Text(optStr.getBytes(UTF_8));
+
+ optStr = options.get(OPT_MAX_CQ);
+ maxCq = optStr == null ? new Text() : new Text(optStr.getBytes(UTF_8));
+
+ optStr = options.get(OPT_MIN_INCLUSIVE);
+ minInclusive = optStr == null || optStr.isEmpty() ? true : Boolean.valueOf(options.get(OPT_MIN_INCLUSIVE));
+
+ optStr = options.get(OPT_MAX_INCLUSIVE);
+ maxInclusive = optStr == null || optStr.isEmpty() ? true : Boolean.valueOf(options.get(OPT_MAX_INCLUSIVE));
+ }
+
+ static class Describer implements OptionDescriber {
+ @Override
+ public OptionDescriber.IteratorOptions describeOptions() {
+ Map<String,String> options = new HashMap<String,String>();
+ options.put(OPT_MIN_CF, OPT_MIN_CF_DESC);
+ options.put(OPT_MIN_CQ, OPT_MIN_CQ_DESC);
+ options.put(OPT_MAX_CF, OPT_MAX_CF_DESC);
+ options.put(OPT_MAX_CQ, OPT_MAX_CQ_DESC);
+ options.put(OPT_MIN_INCLUSIVE, OPT_MIN_INCLUSIVE_DESC);
+ options.put(OPT_MAX_INCLUSIVE, OPT_MAX_INCLUSIVE_DESC);
+ return new OptionDescriber.IteratorOptions("ColumnSliceFilter", "Returns all key/value pairs where the column is between the specified values", options,
+ Collections.<String> emptyList());
+ }
+
+ @Override
+ public boolean validateOptions(Map<String,String> options) {
+ // if you don't specify a max CF and a max CQ, that means there's no upper bounds to the slice. In that case
+ // you must not set max inclusive to false.
+ CfCqSliceOpts o = new CfCqSliceOpts(options);
+ boolean boundsOk = true;
+ boolean upperBoundsExist = o.maxCf.getLength() > 0 && o.maxCq.getLength() > 0;
+ if (upperBoundsExist) {
+ boundsOk = o.maxInclusive;
+ }
+ boolean cqRangeOk = o.maxCq.getLength() == 0 || (o.minCq.compareTo(o.maxCq) < 1);
+ boolean cfRangeOk = o.maxCf.getLength() == 0 || (o.minCf.compareTo(o.maxCf) < 1);
+ return boundsOk && cqRangeOk && cfRangeOk;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/89f44e24/core/src/main/java/org/apache/accumulo/core/iterators/user/CfCqSliceSeekingFilter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/user/CfCqSliceSeekingFilter.java b/core/src/main/java/org/apache/accumulo/core/iterators/user/CfCqSliceSeekingFilter.java
new file mode 100644
index 0000000..e5c4969
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/iterators/user/CfCqSliceSeekingFilter.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.iterators.user;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.PartialKey;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.OptionDescriber;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * Filters key/value pairs for a range of column families and a range of column qualifiers. Only keys which fall in both ranges will be passed by the filter.
+ * Note that if you have a small, well-defined set of column families it will be much more efficient to configure locality groups to isolate that data instead
+ * of configuring this iterator to seek over it.
+ *
+ * This filter may be more efficient than the CfCqSliceFilter or the ColumnSlice filter for small slices of large rows as it will seek to the next potential
+ * match once it determines that it has iterated past the end of a slice.
+ *
+ * @see org.apache.accumulo.core.iterators.user.CfCqSliceOpts for a description of this iterator's options.
+ */
+public class CfCqSliceSeekingFilter extends SeekingFilter implements OptionDescriber {
+
+ private static final FilterResult SKIP_TO_HINT = FilterResult.of(false, AdvanceResult.USE_HINT);
+ private static final FilterResult SKIP_TO_NEXT = FilterResult.of(false, AdvanceResult.NEXT);
+ private static final FilterResult SKIP_TO_NEXT_ROW = FilterResult.of(false, AdvanceResult.NEXT_ROW);
+ private static final FilterResult SKIP_TO_NEXT_CF = FilterResult.of(false, AdvanceResult.NEXT_CF);
+ private static final FilterResult INCLUDE_AND_NEXT = FilterResult.of(true, AdvanceResult.NEXT);
+ private static final FilterResult INCLUDE_AND_NEXT_CF = FilterResult.of(true, AdvanceResult.NEXT_CF);
+
+ private CfCqSliceOpts cso;
+
+ @Override
+ public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
+ super.init(source, options, env);
+ cso = new CfCqSliceOpts(options);
+ }
+
+ @Override
+ public FilterResult filter(Key k, Value v) {
+ if (cso.minCf.getLength() > 0) {
+ int minCfCmp = k.compareColumnFamily(cso.minCf);
+ if (minCfCmp < 0) {
+ return SKIP_TO_HINT; // hint will be the min CF in this row.
+ }
+ if (minCfCmp == 0 && !cso.minInclusive) {
+ return SKIP_TO_NEXT;
+ }
+ }
+ if (cso.maxCf.getLength() > 0) {
+ int maxCfCmp = k.compareColumnFamily(cso.maxCf);
+ if (maxCfCmp > 0 || (maxCfCmp == 0 && !cso.maxInclusive)) {
+ return SKIP_TO_NEXT_ROW;
+ }
+ }
+ // at this point we're in the correct CF range, now check the CQ.
+ if (cso.minCq.getLength() > 0) {
+ int minCqCmp = k.compareColumnQualifier(cso.minCq);
+ if (minCqCmp < 0) {
+ return SKIP_TO_HINT; // hint will be the min CQ in this CF in this row.
+ }
+ if (minCqCmp == 0 && !cso.minInclusive) {
+ return SKIP_TO_NEXT;
+ }
+ }
+ if (cso.maxCq.getLength() > 0) {
+ int maxCqCmp = k.compareColumnQualifier(cso.maxCq);
+ if (maxCqCmp > 0 || (maxCqCmp == 0 && !cso.maxInclusive)) {
+ return SKIP_TO_NEXT_CF;
+ }
+ if (maxCqCmp == 0) {
+ // special-case here: we know we're at the last CQ in the slice, so skip to the next CF in the row.
+ return INCLUDE_AND_NEXT_CF;
+ }
+ }
+ // at this point we're in the CQ slice.
+ return INCLUDE_AND_NEXT;
+ }
+
+ @Override
+ public Key getNextKeyHint(Key k, Value v) throws IllegalArgumentException {
+ if (cso.minCf.getLength() > 0) {
+ int minCfCmp = k.compareColumnFamily(cso.minCf);
+ if (minCfCmp < 0) {
+ Key hint = new Key(k.getRow(), cso.minCf);
+ return cso.minInclusive ? hint : hint.followingKey(PartialKey.ROW_COLFAM);
+ }
+ }
+ if (cso.minCq.getLength() > 0) {
+ int minCqCmp = k.compareColumnQualifier(cso.minCq);
+ if (minCqCmp < 0) {
+ Key hint = new Key(k.getRow(), k.getColumnFamily(), cso.minCq);
+ return cso.minInclusive ? hint : hint.followingKey(PartialKey.ROW_COLFAM_COLQUAL);
+ }
+ }
+ // If we get here it means that we were asked to provide a hint for a key that we
+ // didn't return USE_HINT for.
+ throw new IllegalArgumentException("Don't know how to provide hint for key " + k);
+ }
+
+ @Override
+ public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
+ CfCqSliceSeekingFilter o = (CfCqSliceSeekingFilter) super.deepCopy(env);
+ o.cso = new CfCqSliceOpts(cso);
+ return o;
+ }
+
+ @Override
+ public IteratorOptions describeOptions() {
+ return new CfCqSliceOpts.Describer().describeOptions();
+ }
+
+ @Override
+ public boolean validateOptions(Map<String,String> options) {
+ return new CfCqSliceOpts.Describer().validateOptions(options);
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/89f44e24/core/src/main/java/org/apache/accumulo/core/iterators/user/SeekingFilter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/user/SeekingFilter.java b/core/src/main/java/org/apache/accumulo/core/iterators/user/SeekingFilter.java
new file mode 100644
index 0000000..bdc9b14
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/iterators/user/SeekingFilter.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.iterators.user;
+
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.PartialKey;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iterators.WrappingIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.EnumMap;
+import java.util.Map;
+
+/**
+ * Base class for filters that can skip over key-value pairs which do not match their filter predicate. In addition to returning true/false to accept or reject
+ * a kv pair, subclasses can return an extra field which indicates how far the source iterator should be advanced.
+ *
+ * Note that the behaviour of the negate option is different from the Filter class. If a KV pair fails the subclass' filter predicate and negate is true, then
+ * the KV pair will pass the filter. However if the subclass advances the source past a bunch of KV pairs, all those pairs will be implicitly rejected and
+ * negate will have no effect.
+ *
+ * @see org.apache.accumulo.core.iterators.Filter
+ */
+public abstract class SeekingFilter extends WrappingIterator {
+ private static final Logger log = LoggerFactory.getLogger(SeekingFilter.class);
+
+ protected static final String NEGATE = "negate";
+
+ public enum AdvanceResult {
+ NEXT, NEXT_CQ, NEXT_CF, NEXT_ROW, USE_HINT
+ }
+
+ public static class FilterResult {
+ private static final EnumMap<AdvanceResult,FilterResult> PASSES = new EnumMap<>(AdvanceResult.class);
+ private static final EnumMap<AdvanceResult,FilterResult> FAILS = new EnumMap<>(AdvanceResult.class);
+ static {
+ for (AdvanceResult ar : AdvanceResult.values()) {
+ PASSES.put(ar, new FilterResult(true, ar));
+ FAILS.put(ar, new FilterResult(false, ar));
+ }
+ }
+
+ final boolean accept;
+ final AdvanceResult advance;
+
+ public FilterResult(boolean accept, AdvanceResult advance) {
+ this.accept = accept;
+ this.advance = advance;
+ }
+
+ public static FilterResult of(boolean accept, AdvanceResult advance) {
+ return accept ? PASSES.get(advance) : FAILS.get(advance);
+ }
+
+ public String toString() {
+ return "Acc: " + accept + " Adv: " + advance;
+ }
+ }
+
+ /**
+ * Subclasses must provide an implementation which examines the given key and value and determines (1) whether to accept the KV pair and (2) how far to
+ * advance the source iterator past the key.
+ *
+ * @param k
+ * a key
+ * @param v
+ * a value
+ * @return indicating whether to pass or block the key, and how far the source iterator should be advanced.
+ */
+ public abstract FilterResult filter(Key k, Value v);
+
+ /**
+ * Whenever the subclass returns AdvanceResult.USE_HINT from its filter predicate, this method will be called to see how far to advance the source iterator.
+ * The return value must be a key which is greater than (sorts after) the input key. If the subclass never returns USE_HINT, this method will never be called
+ * and may safely return null.
+ *
+ * @param k
+ * a key
+ * @param v
+ * a value
+ * @return as above
+ */
+ public abstract Key getNextKeyHint(Key k, Value v);
+
+ private Collection<ByteSequence> columnFamilies;
+ private boolean inclusive;
+ private Range seekRange;
+ private boolean negate;
+
+ private AdvanceResult advance;
+
+ private boolean advancedPastSeek = false;
+
+ @Override
+ public void next() throws IOException {
+ advanceSource(getSource(), advance);
+ findTop();
+ }
+
+ @Override
+ public boolean hasTop() {
+ return !advancedPastSeek && super.hasTop();
+ }
+
+ @Override
+ public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
+ super.seek(range, columnFamilies, inclusive);
+ advance = null;
+ this.columnFamilies = columnFamilies;
+ this.inclusive = inclusive;
+ seekRange = range;
+ advancedPastSeek = false;
+ findTop();
+ }
+
+ @Override
+ public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
+ super.init(source, options, env);
+ negate = Boolean.parseBoolean(options.get(NEGATE));
+ }
+
+ @Override
+ public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
+ SeekingFilter newInstance;
+ try {
+ newInstance = this.getClass().newInstance();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ newInstance.setSource(getSource().deepCopy(env));
+ newInstance.negate = negate;
+ return newInstance;
+ }
+
+ protected void findTop() throws IOException {
+ SortedKeyValueIterator<Key,Value> src = getSource();
+ // advance could be null if we've just been seeked
+ advance = null;
+ while (src.hasTop() && !advancedPastSeek) {
+ if (src.getTopKey().isDeleted()) {
+ // as per. o.a.a.core.iterators.Filter, deleted keys always pass through the filter.
+ advance = AdvanceResult.NEXT;
+ return;
+ }
+ FilterResult f = filter(src.getTopKey(), src.getTopValue());
+ if (log.isTraceEnabled()) {
+ log.trace("Filtered: {} result == {} hint == {}", src.getTopKey(), f,
+ f.advance == AdvanceResult.USE_HINT ? getNextKeyHint(src.getTopKey(), src.getTopValue()) : " (none)");
+ }
+ if (f.accept != negate) {
+ // advance will be processed when next is called
+ advance = f.advance;
+ break;
+ } else {
+ advanceSource(src, f.advance);
+ }
+ }
+ }
+
+ private void advanceSource(SortedKeyValueIterator<Key,Value> src, AdvanceResult adv) throws IOException {
+ Key topKey = src.getTopKey();
+ Range advRange = null;
+ switch (adv) {
+ case NEXT:
+ src.next();
+ return;
+ case NEXT_CQ:
+ advRange = new Range(topKey.followingKey(PartialKey.ROW_COLFAM_COLQUAL), null);
+ break;
+ case NEXT_CF:
+ advRange = new Range(topKey.followingKey(PartialKey.ROW_COLFAM), null);
+ break;
+ case NEXT_ROW:
+ advRange = new Range(topKey.followingKey(PartialKey.ROW), null);
+ break;
+ case USE_HINT:
+ Value topVal = src.getTopValue();
+ Key hintKey = getNextKeyHint(topKey, topVal);
+ if (hintKey != null && hintKey.compareTo(topKey) > 0) {
+ advRange = new Range(hintKey, null);
+ } else {
+ String msg = "Filter returned USE_HINT for " + topKey + " but invalid hint: " + hintKey;
+ throw new IOException(msg);
+ }
+ break;
+ }
+ if (advRange == null) {
+ // Should never get here. Just a safeguard in case somebody adds a new type of AdvanceRange and forgets to handle it here.
+ throw new IOException("Unable to determine range to advance to for AdvanceResult " + adv);
+ }
+ advRange = advRange.clip(seekRange, true);
+ if (advRange == null) {
+ // the advanced range is outside the seek range. the source is exhausted.
+ advancedPastSeek = true;
+ } else {
+ src.seek(advRange, columnFamilies, inclusive);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/89f44e24/core/src/test/java/org/apache/accumulo/core/iterators/user/TestCfCqSlice.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/iterators/user/TestCfCqSlice.java b/core/src/test/java/org/apache/accumulo/core/iterators/user/TestCfCqSlice.java
new file mode 100644
index 0000000..d379778
--- /dev/null
+++ b/core/src/test/java/org/apache/accumulo/core/iterators/user/TestCfCqSlice.java
@@ -0,0 +1,415 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.iterators.user;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.accumulo.core.client.lexicoder.Lexicoder;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iterators.SortedMapIterator;
+import org.apache.accumulo.core.iterators.ValueFormatException;
+import org.apache.hadoop.io.Text;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public abstract class TestCfCqSlice {
+
+ private static final Range INFINITY = new Range();
+ private static final Lexicoder<Long> LONG_LEX = new ReadableLongLexicoder(4);
+ private static final AtomicLong ROW_ID_GEN = new AtomicLong();
+
+ private static final boolean easyThereSparky = false;
+ private static final int LR_DIM = easyThereSparky ? 5 : 50;
+
+ private static final Map<String,String> EMPTY_OPTS = Collections.emptyMap();
+ private static final Set<ByteSequence> EMPTY_CF_SET = Collections.emptySet();
+
+ protected abstract Class<? extends SortedKeyValueIterator<Key,Value>> getFilterClass();
+
+ private static TreeMap<Key,Value> data;
+
+ @BeforeClass
+ public static void setupData() {
+ data = createMap(LR_DIM, LR_DIM, LR_DIM);
+ }
+
+ @AfterClass
+ public static void clearData() {
+ data = null;
+ }
+
+ @Test
+ public void testAllRowsFullSlice() throws Exception {
+ boolean[][][] foundKvs = new boolean[LR_DIM][LR_DIM][LR_DIM];
+ loadKvs(foundKvs, EMPTY_OPTS, INFINITY);
+ for (int i = 0; i < LR_DIM; i++) {
+ for (int j = 0; j < LR_DIM; j++) {
+ for (int k = 0; k < LR_DIM; k++) {
+ assertTrue("(r, cf, cq) == (" + i + ", " + j + ", " + k + ") must be found in scan", foundKvs[i][j][k]);
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testSingleRowFullSlice() throws Exception {
+ boolean[][][] foundKvs = new boolean[LR_DIM][LR_DIM][LR_DIM];
+ int rowId = LR_DIM / 2;
+ loadKvs(foundKvs, EMPTY_OPTS, Range.exact(new Text(LONG_LEX.encode((long) rowId))));
+ for (int i = 0; i < LR_DIM; i++) {
+ for (int j = 0; j < LR_DIM; j++) {
+ for (int k = 0; k < LR_DIM; k++) {
+ if (rowId == i) {
+ assertTrue("(r, cf, cq) == (" + i + ", " + j + ", " + k + ") must be found in scan", foundKvs[i][j][k]);
+ } else {
+ assertFalse("(r, cf, cq) == (" + i + ", " + j + ", " + k + ") must not be found in scan", foundKvs[i][j][k]);
+ }
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testAllRowsSlice() throws Exception {
+ boolean[][][] foundKvs = new boolean[LR_DIM][LR_DIM][LR_DIM];
+ long sliceMinCf = 20;
+ long sliceMinCq = 30;
+ long sliceMaxCf = 25;
+ long sliceMaxCq = 35;
+ assertTrue("slice param must be less than LR_DIM", sliceMinCf < LR_DIM);
+ assertTrue("slice param must be less than LR_DIM", sliceMinCq < LR_DIM);
+ assertTrue("slice param must be less than LR_DIM", sliceMaxCf < LR_DIM);
+ assertTrue("slice param must be less than LR_DIM", sliceMaxCq < LR_DIM);
+ Map<String,String> opts = new HashMap<String,String>();
+ opts.put(CfCqSliceOpts.OPT_MIN_CF, new String(LONG_LEX.encode(sliceMinCf), UTF_8));
+ opts.put(CfCqSliceOpts.OPT_MIN_CQ, new String(LONG_LEX.encode(sliceMinCq), UTF_8));
+ opts.put(CfCqSliceOpts.OPT_MAX_CF, new String(LONG_LEX.encode(sliceMaxCf), UTF_8));
+ opts.put(CfCqSliceOpts.OPT_MAX_CQ, new String(LONG_LEX.encode(sliceMaxCq), UTF_8));
+ loadKvs(foundKvs, opts, INFINITY);
+ for (int i = 0; i < LR_DIM; i++) {
+ for (int j = 0; j < LR_DIM; j++) {
+ for (int k = 0; k < LR_DIM; k++) {
+ if (j >= sliceMinCf && j <= sliceMaxCf && k >= sliceMinCq && k <= sliceMaxCq) {
+ assertTrue("(r, cf, cq) == (" + i + ", " + j + ", " + k + ") must be found in scan", foundKvs[i][j][k]);
+ } else {
+ assertFalse("(r, cf, cq) == (" + i + ", " + j + ", " + k + ") must not be found in scan", foundKvs[i][j][k]);
+ }
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testSingleColumnSlice() throws Exception {
+ boolean[][][] foundKvs = new boolean[LR_DIM][LR_DIM][LR_DIM];
+ long sliceMinCf = 20;
+ long sliceMinCq = 20;
+ long sliceMaxCf = 20;
+ long sliceMaxCq = 20;
+ Map<String,String> opts = new HashMap<String,String>();
+ opts.put(CfCqSliceOpts.OPT_MIN_CF, new String(LONG_LEX.encode(sliceMinCf), UTF_8));
+ opts.put(CfCqSliceOpts.OPT_MIN_CQ, new String(LONG_LEX.encode(sliceMinCq), UTF_8));
+ opts.put(CfCqSliceOpts.OPT_MAX_CF, new String(LONG_LEX.encode(sliceMaxCf), UTF_8));
+ opts.put(CfCqSliceOpts.OPT_MAX_CQ, new String(LONG_LEX.encode(sliceMaxCq), UTF_8));
+ loadKvs(foundKvs, opts, INFINITY);
+ for (int i = 0; i < LR_DIM; i++) {
+ for (int j = 0; j < LR_DIM; j++) {
+ for (int k = 0; k < LR_DIM; k++) {
+ if (j == sliceMinCf && k == sliceMinCq) {
+ assertTrue("(r, cf, cq) == (" + i + ", " + j + ", " + k + ") must be found in scan", foundKvs[i][j][k]);
+ } else {
+ assertFalse("(r, cf, cq) == (" + i + ", " + j + ", " + k + ") must not be found in scan", foundKvs[i][j][k]);
+ }
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testSingleColumnSliceByExclude() throws Exception {
+ boolean[][][] foundKvs = new boolean[LR_DIM][LR_DIM][LR_DIM];
+ long sliceMinCf = 20;
+ long sliceMinCq = 20;
+ long sliceMaxCf = 22;
+ long sliceMaxCq = 22;
+ Map<String,String> opts = new HashMap<String,String>();
+ opts.put(CfCqSliceOpts.OPT_MIN_CF, new String(LONG_LEX.encode(sliceMinCf), UTF_8));
+ opts.put(CfCqSliceOpts.OPT_MIN_CQ, new String(LONG_LEX.encode(sliceMinCq), UTF_8));
+ opts.put(CfCqSliceOpts.OPT_MAX_CF, new String(LONG_LEX.encode(sliceMaxCf), UTF_8));
+ opts.put(CfCqSliceOpts.OPT_MAX_CQ, new String(LONG_LEX.encode(sliceMaxCq), UTF_8));
+ opts.put(CfCqSliceOpts.OPT_MAX_INCLUSIVE, "false");
+ opts.put(CfCqSliceOpts.OPT_MIN_INCLUSIVE, "false");
+ loadKvs(foundKvs, opts, INFINITY);
+ for (int i = 0; i < LR_DIM; i++) {
+ for (int j = 0; j < LR_DIM; j++) {
+ for (int k = 0; k < LR_DIM; k++) {
+ if (j == 21 && k == 21) {
+ assertTrue("(r, cf, cq) == (" + i + ", " + j + ", " + k + ") must be found in scan", foundKvs[i][j][k]);
+ } else {
+ assertFalse("(r, cf, cq) == (" + i + ", " + j + ", " + k + ") must not be found in scan", foundKvs[i][j][k]);
+ }
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testAllCfsCqSlice() throws Exception {
+ boolean[][][] foundKvs = new boolean[LR_DIM][LR_DIM][LR_DIM];
+ long sliceMinCq = 10;
+ long sliceMaxCq = 30;
+ Map<String,String> opts = new HashMap<String,String>();
+ opts.put(CfCqSliceOpts.OPT_MIN_CQ, new String(LONG_LEX.encode(sliceMinCq), UTF_8));
+ opts.put(CfCqSliceOpts.OPT_MAX_CQ, new String(LONG_LEX.encode(sliceMaxCq), UTF_8));
+ loadKvs(foundKvs, opts, INFINITY);
+ for (int i = 0; i < LR_DIM; i++) {
+ for (int j = 0; j < LR_DIM; j++) {
+ for (int k = 0; k < LR_DIM; k++) {
+ if (k >= sliceMinCq && k <= sliceMaxCq) {
+ assertTrue("(r, cf, cq) == (" + i + ", " + j + ", " + k + ") must be found in scan", foundKvs[i][j][k]);
+ } else {
+ assertFalse("(r, cf, cq) == (" + i + ", " + j + ", " + k + ") must not be found in scan", foundKvs[i][j][k]);
+ }
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testSliceCfsAllCqs() throws Exception {
+ boolean[][][] foundKvs = new boolean[LR_DIM][LR_DIM][LR_DIM];
+ long sliceMinCf = 10;
+ long sliceMaxCf = 30;
+ Map<String,String> opts = new HashMap<String,String>();
+ opts.put(CfCqSliceOpts.OPT_MIN_CF, new String(LONG_LEX.encode(sliceMinCf), UTF_8));
+ opts.put(CfCqSliceOpts.OPT_MAX_CF, new String(LONG_LEX.encode(sliceMaxCf), UTF_8));
+ loadKvs(foundKvs, opts, INFINITY);
+ for (int i = 0; i < LR_DIM; i++) {
+ for (int j = 0; j < LR_DIM; j++) {
+ for (int k = 0; k < LR_DIM; k++) {
+ if (j >= sliceMinCf && j <= sliceMaxCf) {
+ assertTrue("(r, cf, cq) == (" + i + ", " + j + ", " + k + ") must be found in scan", foundKvs[i][j][k]);
+ } else {
+ assertFalse("(r, cf, cq) == (" + i + ", " + j + ", " + k + ") must not be found in scan", foundKvs[i][j][k]);
+ }
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testEmptySlice() throws Exception {
+ boolean[][][] foundKvs = new boolean[LR_DIM][LR_DIM][LR_DIM];
+ long sliceMinCf = LR_DIM + 1;
+ long sliceMinCq = LR_DIM + 1;
+ long sliceMaxCf = LR_DIM + 1;
+ long sliceMaxCq = LR_DIM + 1;
+ Map<String,String> opts = new HashMap<String,String>();
+ opts.put(CfCqSliceOpts.OPT_MIN_CF, new String(LONG_LEX.encode(sliceMinCf), UTF_8));
+ opts.put(CfCqSliceOpts.OPT_MIN_CQ, new String(LONG_LEX.encode(sliceMinCq), UTF_8));
+ opts.put(CfCqSliceOpts.OPT_MAX_CF, new String(LONG_LEX.encode(sliceMaxCf), UTF_8));
+ opts.put(CfCqSliceOpts.OPT_MAX_CQ, new String(LONG_LEX.encode(sliceMaxCq), UTF_8));
+ opts.put(CfCqSliceOpts.OPT_MAX_INCLUSIVE, "false");
+ opts.put(CfCqSliceOpts.OPT_MIN_INCLUSIVE, "false");
+ loadKvs(foundKvs, opts, INFINITY);
+ for (int i = 0; i < LR_DIM; i++) {
+ for (int j = 0; j < LR_DIM; j++) {
+ for (int k = 0; k < LR_DIM; k++) {
+ assertFalse("(r, cf, cq) == (" + i + ", " + j + ", " + k + ") must not be found in scan", foundKvs[i][j][k]);
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testStackedFilters() throws Exception {
+ Map<String,String> firstOpts = new HashMap<>();
+ Map<String,String> secondOpts = new HashMap<>();
+ boolean[][][] foundKvs = new boolean[LR_DIM][LR_DIM][LR_DIM];
+ long sliceMinCf = 20;
+ long sliceMaxCf = 25;
+ long sliceMinCq = 30;
+ long sliceMaxCq = 35;
+ assertTrue("slice param must be less than LR_DIM", sliceMinCf < LR_DIM);
+ assertTrue("slice param must be less than LR_DIM", sliceMinCq < LR_DIM);
+ assertTrue("slice param must be less than LR_DIM", sliceMaxCf < LR_DIM);
+ assertTrue("slice param must be less than LR_DIM", sliceMaxCq < LR_DIM);
+ firstOpts.put(CfCqSliceOpts.OPT_MIN_CF, new String(LONG_LEX.encode(sliceMinCf), UTF_8));
+ firstOpts.put(CfCqSliceOpts.OPT_MAX_CF, new String(LONG_LEX.encode(sliceMaxCf), UTF_8));
+ secondOpts.put(CfCqSliceOpts.OPT_MIN_CQ, new String(LONG_LEX.encode(sliceMinCq), UTF_8));
+ secondOpts.put(CfCqSliceOpts.OPT_MAX_CQ, new String(LONG_LEX.encode(sliceMaxCq), UTF_8));
+ SortedKeyValueIterator<Key,Value> skvi = getFilterClass().newInstance();
+ skvi.init(new SortedMapIterator(data), firstOpts, null);
+ loadKvs(skvi.deepCopy(null), foundKvs, secondOpts, INFINITY);
+ for (int i = 0; i < LR_DIM; i++) {
+ for (int j = 0; j < LR_DIM; j++) {
+ for (int k = 0; k < LR_DIM; k++) {
+ if (j >= sliceMinCf && j <= sliceMaxCf && k >= sliceMinCq && k <= sliceMaxCq) {
+ assertTrue("(r, cf, cq) == (" + i + ", " + j + ", " + k + ") must be found in scan", foundKvs[i][j][k]);
+ } else {
+ assertFalse("(r, cf, cq) == (" + i + ", " + j + ", " + k + ") must not be found in scan", foundKvs[i][j][k]);
+ }
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testSeekMinExclusive() throws Exception {
+ boolean[][][] foundKvs = new boolean[LR_DIM][LR_DIM][LR_DIM];
+ long sliceMinCf = 20;
+ long sliceMinCq = 30;
+ long sliceMaxCf = 25;
+ long sliceMaxCq = 35;
+ assertTrue("slice param must be less than LR_DIM", sliceMinCf < LR_DIM);
+ assertTrue("slice param must be less than LR_DIM", sliceMinCq < LR_DIM);
+ assertTrue("slice param must be less than LR_DIM", sliceMaxCf < LR_DIM);
+ assertTrue("slice param must be less than LR_DIM", sliceMaxCq < LR_DIM);
+ Map<String,String> opts = new HashMap<>();
+ opts.put(CfCqSliceOpts.OPT_MIN_CF, new String(LONG_LEX.encode(sliceMinCf), UTF_8));
+ opts.put(CfCqSliceOpts.OPT_MIN_INCLUSIVE, "false");
+ opts.put(CfCqSliceOpts.OPT_MIN_CQ, new String(LONG_LEX.encode(sliceMinCq), UTF_8));
+ opts.put(CfCqSliceOpts.OPT_MAX_CF, new String(LONG_LEX.encode(sliceMaxCf), UTF_8));
+ opts.put(CfCqSliceOpts.OPT_MAX_CQ, new String(LONG_LEX.encode(sliceMaxCq), UTF_8));
+ Range startsAtMinCf = new Range(new Key(LONG_LEX.encode(0l), LONG_LEX.encode(sliceMinCf), LONG_LEX.encode(sliceMinCq), new byte[] {}, Long.MAX_VALUE), null);
+ loadKvs(foundKvs, opts, startsAtMinCf);
+ for (int i = 0; i < LR_DIM; i++) {
+ for (int j = 0; j < LR_DIM; j++) {
+ for (int k = 0; k < LR_DIM; k++) {
+ if (j > sliceMinCf && j <= sliceMaxCf && k > sliceMinCq && k <= sliceMaxCq) {
+ assertTrue("(r, cf, cq) == (" + i + ", " + j + ", " + k + ") must be found in scan", foundKvs[i][j][k]);
+ } else {
+ assertFalse("(r, cf, cq) == (" + i + ", " + j + ", " + k + ") must not be found in scan", foundKvs[i][j][k]);
+ }
+ }
+ }
+ }
+ foundKvs = new boolean[LR_DIM][LR_DIM][LR_DIM];
+ sliceMinCq = 0;
+ sliceMaxCq = 10;
+ opts.put(CfCqSliceOpts.OPT_MIN_CF, new String(LONG_LEX.encode(sliceMinCf), UTF_8));
+ opts.put(CfCqSliceOpts.OPT_MIN_INCLUSIVE, "false");
+ opts.put(CfCqSliceOpts.OPT_MIN_CQ, new String(LONG_LEX.encode(sliceMinCq), UTF_8));
+ opts.put(CfCqSliceOpts.OPT_MAX_CF, new String(LONG_LEX.encode(sliceMaxCf), UTF_8));
+ opts.put(CfCqSliceOpts.OPT_MAX_CQ, new String(LONG_LEX.encode(sliceMaxCq), UTF_8));
+ loadKvs(foundKvs, opts, INFINITY);
+ for (int i = 0; i < LR_DIM; i++) {
+ for (int j = 0; j < LR_DIM; j++) {
+ for (int k = 0; k < LR_DIM; k++) {
+ if (j > sliceMinCf && j <= sliceMaxCf && k > sliceMinCq && k <= sliceMaxCq) {
+ assertTrue("(r, cf, cq) == (" + i + ", " + j + ", " + k + ") must be found in scan", foundKvs[i][j][k]);
+ } else {
+ assertFalse("(r, cf, cq) == (" + i + ", " + j + ", " + k + ") must not be found in scan", foundKvs[i][j][k]);
+ }
+ }
+ }
+ }
+ }
+
+ private void loadKvs(boolean[][][] foundKvs, Map<String,String> options, Range range) {
+ loadKvs(new SortedMapIterator(data), foundKvs, options, range);
+ }
+
+ private void loadKvs(SortedKeyValueIterator<Key,Value> parent, boolean[][][] foundKvs, Map<String,String> options, Range range) {
+ try {
+ SortedKeyValueIterator<Key,Value> skvi = getFilterClass().newInstance();
+ skvi.init(parent, options, null);
+ skvi.seek(range, EMPTY_CF_SET, false);
+
+ Random random = new Random();
+
+ while (skvi.hasTop()) {
+ Key k = skvi.getTopKey();
+ int row = LONG_LEX.decode(k.getRow().copyBytes()).intValue();
+ int cf = LONG_LEX.decode(k.getColumnFamily().copyBytes()).intValue();
+ int cq = LONG_LEX.decode(k.getColumnQualifier().copyBytes()).intValue();
+
+ assertFalse("Duplicate " + row + " " + cf + " " + cq, foundKvs[row][cf][cq]);
+ foundKvs[row][cf][cq] = true;
+
+ if (random.nextInt(100) == 0) {
+ skvi.seek(new Range(k, false, range.getEndKey(), range.isEndKeyInclusive()), EMPTY_CF_SET, false);
+ } else {
+ skvi.next();
+ }
+ }
+
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Rows 0..(LR_DIM - 1) will each have LR_DIM CFs, each with LR_DIM CQs
+ *
+ * For instance if LR_DIM is 3, (cf,cq) r: val
+ *
+ * (0,0) (0,1) (0,2) (1,0) (1,1) (1,2) (2,0) (2,1) (2,2) 0 0 1 2 3 4 5 6 7 8 1 9 10 11 12 13 14 15 16 17 2 18 19 20 21 22 23 24 25 26
+ */
+ static TreeMap<Key,Value> createMap(int numRows, int numCfs, int numCqs) {
+ TreeMap<Key,Value> data = new TreeMap<>();
+ for (int i = 0; i < numRows; i++) {
+ byte[] rowId = LONG_LEX.encode(ROW_ID_GEN.getAndIncrement());
+ for (int j = 0; j < numCfs; j++) {
+ for (int k = 0; k < numCqs; k++) {
+ byte[] cf = LONG_LEX.encode((long) j);
+ byte[] cq = LONG_LEX.encode((long) k);
+ byte[] val = LONG_LEX.encode((long) (i * numCfs + j * numCqs + k));
+ data.put(new Key(rowId, cf, cq, new byte[0], 9), new Value(val));
+ }
+ }
+ }
+ return data;
+ }
+
+ static class ReadableLongLexicoder implements Lexicoder<Long> {
+ final String fmtStr;
+
+ public ReadableLongLexicoder() {
+ this(20);
+ }
+
+ public ReadableLongLexicoder(int numDigits) {
+ fmtStr = "%0" + numDigits + "d";
+ }
+
+ @Override
+ public byte[] encode(Long l) {
+ return String.format(fmtStr, l).getBytes(UTF_8);
+ }
+
+ @Override
+ public Long decode(byte[] b) throws ValueFormatException {
+ return Long.parseLong(new String(b, UTF_8));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/89f44e24/core/src/test/java/org/apache/accumulo/core/iterators/user/TestCfCqSliceFilter.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/iterators/user/TestCfCqSliceFilter.java b/core/src/test/java/org/apache/accumulo/core/iterators/user/TestCfCqSliceFilter.java
new file mode 100644
index 0000000..3f92963
--- /dev/null
+++ b/core/src/test/java/org/apache/accumulo/core/iterators/user/TestCfCqSliceFilter.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.iterators.user;
+
+public class TestCfCqSliceFilter extends TestCfCqSlice {
+ @Override
+ protected Class<CfCqSliceFilter> getFilterClass() {
+ return CfCqSliceFilter.class;
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/89f44e24/core/src/test/java/org/apache/accumulo/core/iterators/user/TestCfCqSliceSeekingFilter.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/iterators/user/TestCfCqSliceSeekingFilter.java b/core/src/test/java/org/apache/accumulo/core/iterators/user/TestCfCqSliceSeekingFilter.java
new file mode 100644
index 0000000..314b510
--- /dev/null
+++ b/core/src/test/java/org/apache/accumulo/core/iterators/user/TestCfCqSliceSeekingFilter.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.iterators.user;
+
+public class TestCfCqSliceSeekingFilter extends TestCfCqSlice {
+ @Override
+ protected Class<CfCqSliceSeekingFilter> getFilterClass() {
+ return CfCqSliceSeekingFilter.class;
+ }
+}