You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@accumulo.apache.org by GitBox <gi...@apache.org> on 2021/11/04 17:48:21 UTC

[GitHub] [accumulo] milleruntime opened a new pull request #2347: New IteratorMincClassCastBugIT and fixes to #1411

milleruntime opened a new pull request #2347:
URL: https://github.com/apache/accumulo/pull/2347


   * Create new IT for testing runtime bug that was introduced when moving
   the iterators to declare them public API in #1411. The other changes in
   this commit are for fixing the bug.
   * Drop new class InterruptibleMapIterator that was created in #1411
   * Revert SortedMapIterator back to previous implementation where it
   was an InterruptibleIterator but keep it in core.iterators pkg
   * Move InterruptibleIterator interface to API and add javadoc
   * Move ColumnFamilySkippingIterator to API
   * Add javadoc to IterationInterruptedException
   * Fixes #2341


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] milleruntime commented on a change in pull request #2347: New IteratorMincClassCastBugIT and fixes to #1411

Posted by GitBox <gi...@apache.org>.
milleruntime commented on a change in pull request #2347:
URL: https://github.com/apache/accumulo/pull/2347#discussion_r743746915



##########
File path: core/src/main/java/org/apache/accumulo/core/iterators/ColumnFamilySkippingIterator.java
##########
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.accumulo.core.iteratorsImpl.system;
+package org.apache.accumulo.core.iterators;

Review comment:
       That is what I had originally thought but after investigating this bug it seems like `SortedMapIterator` and this one could be cast to an InterruptibleIterator in the `LocalityGroupIterator`
   https://github.com/apache/accumulo/blob/8d33bc4d489880f15950189d20287262ce46e68e/core/src/main/java/org/apache/accumulo/core/iteratorsImpl/system/LocalityGroupIterator.java#L49
   
   I think if a user extends one of them and configures it to run in certain situations it could be created through deepCopy(). But it also seems dangerous either way.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] Manno15 commented on a change in pull request #2347: New IteratorMincClassCastBugIT and fixes to #1411

Posted by GitBox <gi...@apache.org>.
Manno15 commented on a change in pull request #2347:
URL: https://github.com/apache/accumulo/pull/2347#discussion_r743164169



##########
File path: test/src/main/java/org/apache/accumulo/test/functional/IteratorMincClassCastBugIT.java
##########
@@ -0,0 +1,516 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.admin.NewTableConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.ArrayByteSequence;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.PartialKey;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.IteratorUtil;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+/**
+ * Tests iterator class hierarchy bug. See https://github.com/apache/accumulo/issues/2341
+ */
+public class IteratorMincClassCastBugIT extends AccumuloClusterHarness {
+
+  @Override
+  public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+    // this bug only shows up when not using native maps
+    cfg.setProperty(Property.TSERV_NATIVEMAP_ENABLED, "false");
+    cfg.setNumTservers(1);
+  }
+
+  @Override
+  protected int defaultTimeoutSeconds() {
+    return 60;
+  }
+
+  @Test
+  public void test() throws Exception {
+    try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) {
+
+      String tableName = getUniqueNames(1)[0];
+
+      NewTableConfiguration ntc = new NewTableConfiguration();
+      Map<String,Set<Text>> groups = new HashMap<>();
+      groups.put("g1", Set.of(new Text("~chunk")));
+      groups.put("g2", Set.of(new Text("refs")));
+      ntc.setLocalityGroups(groups);
+
+      IteratorSetting iteratorSetting = new IteratorSetting(20, ChunkCombiner.class);
+      ntc.attachIterator(iteratorSetting, EnumSet.of(IteratorUtil.IteratorScope.minc));
+
+      c.tableOperations().create(tableName, ntc);
+
+      int chunkSize = 64 * 1024;
+      ColumnVisibility visibility = new ColumnVisibility();
+
+      List<URL> files = new ArrayList<>();
+      files.add(getClass().getClassLoader().getResource("testfile1.md"));
+
+      try (BatchWriter bw = c.createBatchWriter(tableName, new BatchWriterConfig())) {
+        FileDataIngest fdi = new FileDataIngest(chunkSize, visibility);
+        for (URL filename : files) {
+          fdi.insertFileData(filename, bw);
+        }
+      }
+
+      c.tableOperations().flush(tableName, null, null, true);
+    }
+  }
+
+  /**
+   * Copied from 2.0 examples. Takes a list of files and archives them into Accumulo keyed on hashes
+   * of the files.
+   */
+  @SuppressFBWarnings(value = "WEAK_MESSAGE_DIGEST_MD5", justification = "For testing only")
+  public static class FileDataIngest {
+    public static final Text CHUNK_CF = new Text("~chunk");
+    public static final Text REFS_CF = new Text("refs");
+    public static final String REFS_ORIG_FILE = "name";
+    public static final String REFS_FILE_EXT = "filext";
+    public static final ByteSequence CHUNK_CF_BS =
+        new ArrayByteSequence(CHUNK_CF.getBytes(), 0, CHUNK_CF.getLength());
+    public static final ByteSequence REFS_CF_BS =
+        new ArrayByteSequence(REFS_CF.getBytes(), 0, REFS_CF.getLength());
+
+    int chunkSize;
+    byte[] chunkSizeBytes;
+    byte[] buf;
+    MessageDigest md5digest;
+    ColumnVisibility cv;
+
+    public FileDataIngest(int chunkSize, ColumnVisibility colvis) {
+      this.chunkSize = chunkSize;
+      chunkSizeBytes = intToBytes(chunkSize);
+      buf = new byte[chunkSize];
+      try {
+        md5digest = MessageDigest.getInstance("MD5");
+      } catch (NoSuchAlgorithmException e) {
+        throw new RuntimeException(e);
+      }
+      cv = colvis;
+    }
+
+    public String insertFileData(URL fileURL, BatchWriter bw)
+        throws MutationsRejectedException, IOException {
+      String filename = fileURL.getFile();
+      if (chunkSize == 0)
+        return "";
+      md5digest.reset();
+      String uid = hexString(md5digest.digest(filename.getBytes()));
+
+      // read through file once, calculating hashes
+      md5digest.reset();
+      InputStream fis = null;
+      int numRead = 0;
+      // try (var reader = new InputStreamReader(accumuloPropsLocation.openStream(), UTF_8)) {

Review comment:
       ```suggestion
   
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] milleruntime commented on a change in pull request #2347: New IteratorMincClassCastBugIT and fixes to #1411

Posted by GitBox <gi...@apache.org>.
milleruntime commented on a change in pull request #2347:
URL: https://github.com/apache/accumulo/pull/2347#discussion_r745625171



##########
File path: core/src/main/java/org/apache/accumulo/core/iterators/IterationInterruptedException.java
##########
@@ -18,6 +18,9 @@
  */
 package org.apache.accumulo.core.iterators;
 
+/**
+ * Exception thrown if an interrupt flag is detected.
+ */
 public class IterationInterruptedException extends RuntimeException {

Review comment:
       Good catch. I moved it to iteratorsImpl in 892a867. Thankfully, nothing outside of the tserver code and Rfile use it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] ctubbsii commented on a change in pull request #2347: New IteratorMincClassCastBugIT and fixes to #1411

Posted by GitBox <gi...@apache.org>.
ctubbsii commented on a change in pull request #2347:
URL: https://github.com/apache/accumulo/pull/2347#discussion_r743166509



##########
File path: core/src/main/java/org/apache/accumulo/core/iterators/InterruptibleIterator.java
##########
@@ -16,14 +16,22 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.accumulo.core.iteratorsImpl.system;
+package org.apache.accumulo.core.iterators;
 
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 
+/**
+ * Allows an iterator to be interrupted. Typically, once the interrupt flag is set the iterator will
+ * throw an {@link InterruptedException} if the interrupt is detected. Some iterators have been
+ * optimized to not always check the flag.
+ *
+ * One example of a system interrupt is when a Tablet is being closed. If a Tablet has an active

Review comment:
       Blank line in javadoc without a `<p>` isn't correct

##########
File path: core/src/main/java/org/apache/accumulo/core/iterators/SortedMapIterator.java
##########
@@ -36,28 +37,40 @@
  *
  * Note that this class is intended as an in-memory replacement for RFile$Reader, so its behavior
  * reflects the same assumptions; namely, that this iterator is not responsible for respecting the
- * columnFamilies passed into seek().
+ * columnFamilies passed into seek(). If you want a Map-backed Iterator that returns only sought
+ * CFs, construct a new ColumnFamilySkippingIterator(new SortedMapIterator(map)).
+ *
+ * @see org.apache.accumulo.core.iterators.ColumnFamilySkippingIterator
  */
-public class SortedMapIterator implements SortedKeyValueIterator<Key,Value> {
+public class SortedMapIterator implements InterruptibleIterator {
   private Iterator<Entry<Key,Value>> iter;
   private Entry<Key,Value> entry;
 
   private SortedMap<Key,Value> map;
   private Range range;
 
+  private AtomicBoolean interruptFlag;

Review comment:
       Atomic references are typically `final` whenever possible, to prevent bugs where atomicity is broken because the reference is changed. I see this can't be made final because there's a setter... but that seems like bad API design. Is there something we can do to fix this?

##########
File path: test/src/main/java/org/apache/accumulo/test/functional/IteratorMincClassCastBugIT.java
##########
@@ -0,0 +1,516 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.admin.NewTableConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.ArrayByteSequence;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.PartialKey;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.IteratorUtil;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+/**
+ * Tests iterator class hierarchy bug. See https://github.com/apache/accumulo/issues/2341
+ */
+public class IteratorMincClassCastBugIT extends AccumuloClusterHarness {
+
+  @Override
+  public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+    // this bug only shows up when not using native maps
+    cfg.setProperty(Property.TSERV_NATIVEMAP_ENABLED, "false");
+    cfg.setNumTservers(1);
+  }
+
+  @Override
+  protected int defaultTimeoutSeconds() {
+    return 60;
+  }
+
+  @Test
+  public void test() throws Exception {
+    try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) {
+
+      String tableName = getUniqueNames(1)[0];
+
+      NewTableConfiguration ntc = new NewTableConfiguration();
+      Map<String,Set<Text>> groups = new HashMap<>();
+      groups.put("g1", Set.of(new Text("~chunk")));
+      groups.put("g2", Set.of(new Text("refs")));
+      ntc.setLocalityGroups(groups);
+
+      IteratorSetting iteratorSetting = new IteratorSetting(20, ChunkCombiner.class);
+      ntc.attachIterator(iteratorSetting, EnumSet.of(IteratorUtil.IteratorScope.minc));
+
+      c.tableOperations().create(tableName, ntc);
+
+      int chunkSize = 64 * 1024;
+      ColumnVisibility visibility = new ColumnVisibility();
+
+      List<URL> files = new ArrayList<>();
+      files.add(getClass().getClassLoader().getResource("testfile1.md"));
+
+      try (BatchWriter bw = c.createBatchWriter(tableName, new BatchWriterConfig())) {
+        FileDataIngest fdi = new FileDataIngest(chunkSize, visibility);
+        for (URL filename : files) {
+          fdi.insertFileData(filename, bw);
+        }
+      }
+
+      c.tableOperations().flush(tableName, null, null, true);
+    }
+  }
+
+  /**
+   * Copied from 2.0 examples. Takes a list of files and archives them into Accumulo keyed on hashes
+   * of the files.
+   */
+  @SuppressFBWarnings(value = "WEAK_MESSAGE_DIGEST_MD5", justification = "For testing only")
+  public static class FileDataIngest {
+    public static final Text CHUNK_CF = new Text("~chunk");
+    public static final Text REFS_CF = new Text("refs");
+    public static final String REFS_ORIG_FILE = "name";
+    public static final String REFS_FILE_EXT = "filext";
+    public static final ByteSequence CHUNK_CF_BS =
+        new ArrayByteSequence(CHUNK_CF.getBytes(), 0, CHUNK_CF.getLength());
+    public static final ByteSequence REFS_CF_BS =
+        new ArrayByteSequence(REFS_CF.getBytes(), 0, REFS_CF.getLength());
+
+    int chunkSize;
+    byte[] chunkSizeBytes;
+    byte[] buf;
+    MessageDigest md5digest;
+    ColumnVisibility cv;
+
+    public FileDataIngest(int chunkSize, ColumnVisibility colvis) {
+      this.chunkSize = chunkSize;
+      chunkSizeBytes = intToBytes(chunkSize);
+      buf = new byte[chunkSize];
+      try {
+        md5digest = MessageDigest.getInstance("MD5");
+      } catch (NoSuchAlgorithmException e) {
+        throw new RuntimeException(e);
+      }
+      cv = colvis;
+    }
+
+    public String insertFileData(URL fileURL, BatchWriter bw)
+        throws MutationsRejectedException, IOException {
+      String filename = fileURL.getFile();
+      if (chunkSize == 0)
+        return "";
+      md5digest.reset();
+      String uid = hexString(md5digest.digest(filename.getBytes()));
+
+      // read through file once, calculating hashes
+      md5digest.reset();
+      InputStream fis = null;
+      int numRead = 0;
+      // try (var reader = new InputStreamReader(accumuloPropsLocation.openStream(), UTF_8)) {
+      try {
+        fis = new FileInputStream(filename);
+        numRead = fis.read(buf);
+        while (numRead >= 0) {
+          if (numRead > 0) {
+            md5digest.update(buf, 0, numRead);
+          }
+          numRead = fis.read(buf);
+        }
+      } finally {
+        if (fis != null) {
+          fis.close();
+        }
+      }

Review comment:
       Our test file is small enough that this can be reduced to two simple lines... one to read the file fully into a String, and another to compute the digest from the String's bytes. This would improve the comprehensibility of the test, since we don't actually need to chunk and buffer the tiny test file.

##########
File path: test/src/main/java/org/apache/accumulo/test/functional/IteratorMincClassCastBugIT.java
##########
@@ -0,0 +1,516 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.admin.NewTableConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.ArrayByteSequence;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.PartialKey;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.IteratorUtil;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+/**
+ * Tests iterator class hierarchy bug. See https://github.com/apache/accumulo/issues/2341
+ */
+public class IteratorMincClassCastBugIT extends AccumuloClusterHarness {
+
+  @Override
+  public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+    // this bug only shows up when not using native maps
+    cfg.setProperty(Property.TSERV_NATIVEMAP_ENABLED, "false");
+    cfg.setNumTservers(1);
+  }
+
+  @Override
+  protected int defaultTimeoutSeconds() {
+    return 60;
+  }
+
+  @Test
+  public void test() throws Exception {
+    try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) {
+
+      String tableName = getUniqueNames(1)[0];
+
+      NewTableConfiguration ntc = new NewTableConfiguration();
+      Map<String,Set<Text>> groups = new HashMap<>();
+      groups.put("g1", Set.of(new Text("~chunk")));
+      groups.put("g2", Set.of(new Text("refs")));
+      ntc.setLocalityGroups(groups);
+
+      IteratorSetting iteratorSetting = new IteratorSetting(20, ChunkCombiner.class);
+      ntc.attachIterator(iteratorSetting, EnumSet.of(IteratorUtil.IteratorScope.minc));
+
+      c.tableOperations().create(tableName, ntc);
+
+      int chunkSize = 64 * 1024;
+      ColumnVisibility visibility = new ColumnVisibility();
+
+      List<URL> files = new ArrayList<>();
+      files.add(getClass().getClassLoader().getResource("testfile1.md"));
+
+      try (BatchWriter bw = c.createBatchWriter(tableName, new BatchWriterConfig())) {
+        FileDataIngest fdi = new FileDataIngest(chunkSize, visibility);
+        for (URL filename : files) {
+          fdi.insertFileData(filename, bw);
+        }
+      }
+
+      c.tableOperations().flush(tableName, null, null, true);
+    }
+  }
+
+  /**
+   * Copied from 2.0 examples. Takes a list of files and archives them into Accumulo keyed on hashes
+   * of the files.
+   */
+  @SuppressFBWarnings(value = "WEAK_MESSAGE_DIGEST_MD5", justification = "For testing only")
+  public static class FileDataIngest {
+    public static final Text CHUNK_CF = new Text("~chunk");
+    public static final Text REFS_CF = new Text("refs");
+    public static final String REFS_ORIG_FILE = "name";
+    public static final String REFS_FILE_EXT = "filext";
+    public static final ByteSequence CHUNK_CF_BS =
+        new ArrayByteSequence(CHUNK_CF.getBytes(), 0, CHUNK_CF.getLength());
+    public static final ByteSequence REFS_CF_BS =
+        new ArrayByteSequence(REFS_CF.getBytes(), 0, REFS_CF.getLength());
+
+    int chunkSize;
+    byte[] chunkSizeBytes;
+    byte[] buf;
+    MessageDigest md5digest;
+    ColumnVisibility cv;
+
+    public FileDataIngest(int chunkSize, ColumnVisibility colvis) {
+      this.chunkSize = chunkSize;
+      chunkSizeBytes = intToBytes(chunkSize);
+      buf = new byte[chunkSize];
+      try {
+        md5digest = MessageDigest.getInstance("MD5");
+      } catch (NoSuchAlgorithmException e) {
+        throw new RuntimeException(e);
+      }
+      cv = colvis;
+    }
+
+    public String insertFileData(URL fileURL, BatchWriter bw)
+        throws MutationsRejectedException, IOException {
+      String filename = fileURL.getFile();
+      if (chunkSize == 0)
+        return "";
+      md5digest.reset();
+      String uid = hexString(md5digest.digest(filename.getBytes()));
+
+      // read through file once, calculating hashes
+      md5digest.reset();
+      InputStream fis = null;
+      int numRead = 0;
+      // try (var reader = new InputStreamReader(accumuloPropsLocation.openStream(), UTF_8)) {
+      try {
+        fis = new FileInputStream(filename);
+        numRead = fis.read(buf);
+        while (numRead >= 0) {
+          if (numRead > 0) {
+            md5digest.update(buf, 0, numRead);
+          }
+          numRead = fis.read(buf);
+        }
+      } finally {
+        if (fis != null) {
+          fis.close();
+        }
+      }
+
+      String hash = hexString(md5digest.digest());
+      Text row = new Text(hash);
+
+      // write info to accumulo
+      Mutation m = new Mutation(row);
+      m.put(REFS_CF, buildNullSepText(uid, REFS_ORIG_FILE), cv, new Value(filename.getBytes()));
+      String fext = getExt(filename);
+      if (fext != null)
+        m.put(REFS_CF, buildNullSepText(uid, REFS_FILE_EXT), cv, new Value(fext.getBytes()));
+      bw.addMutation(m);
+
+      // read through file again, writing chunks to accumulo
+      int chunkCount = 0;
+      try {
+        fis = new FileInputStream(filename);
+        numRead = fis.read(buf);
+        while (numRead >= 0) {
+          while (numRead < buf.length) {
+            int moreRead = fis.read(buf, numRead, buf.length - numRead);
+            if (moreRead > 0)
+              numRead += moreRead;
+            else if (moreRead < 0)
+              break;
+          }
+          m = new Mutation(row);
+          Text chunkCQ = new Text(chunkSizeBytes);
+          chunkCQ.append(intToBytes(chunkCount), 0, 4);
+          m.put(CHUNK_CF, chunkCQ, cv, new Value(buf, 0, numRead));
+          bw.addMutation(m);
+          if (chunkCount == Integer.MAX_VALUE)
+            throw new RuntimeException(
+                "too many chunks for file " + filename + ", try raising chunk size");
+          chunkCount++;
+          numRead = fis.read(buf);
+        }
+      } finally {
+        if (fis != null) {
+          fis.close();
+        }
+      }
+      m = new Mutation(row);
+      Text chunkCQ = new Text(chunkSizeBytes);
+      chunkCQ.append(intToBytes(chunkCount), 0, 4);
+      m.put(new Text(CHUNK_CF), chunkCQ, cv, new Value(new byte[0]));
+      bw.addMutation(m);
+      return hash;
+    }
+
+    public static byte[] intToBytes(int l) {
+      byte[] b = new byte[4];
+      b[0] = (byte) (l >>> 24);
+      b[1] = (byte) (l >>> 16);
+      b[2] = (byte) (l >>> 8);
+      b[3] = (byte) (l >>> 0);
+      return b;
+    }
+
+    private static String getExt(String filename) {
+      if (filename.indexOf(".") == -1)
+        return null;
+      return filename.substring(filename.lastIndexOf(".") + 1);
+    }
+
+    public String hexString(byte[] bytes) {
+      StringBuilder sb = new StringBuilder();
+      for (byte b : bytes) {
+        sb.append(String.format("%02x", b));
+      }
+      return sb.toString();
+    }
+  }
+
+  /**
+   * This iterator dedupes chunks and sets their visibilities to the combined visibility of the refs
+   * columns. For example, it would combine
+   *
+   * <pre>
+   *    row1 refs uid1\0a A&amp;B V0
+   *    row1 refs uid2\0b C&amp;D V0
+   *    row1 ~chunk 0 A&amp;B V1
+   *    row1 ~chunk 0 C&amp;D V1
+   *    row1 ~chunk 0 E&amp;F V1
+   *    row1 ~chunk 0 G&amp;H V1
+   * </pre>
+   *
+   * into the following
+   *
+   * <pre>
+   *    row1 refs uid1\0a A&amp;B V0
+   *    row1 refs uid2\0b C&amp;D V0
+   *    row1 ~chunk 0 (A&amp;B)|(C&amp;D) V1
+   * </pre>
+   *
+   * {@link VisibilityCombiner} is used to combie the visibilities.
+   */
+  public static class ChunkCombiner implements SortedKeyValueIterator<Key,Value> {
+    private static final Logger log = LoggerFactory.getLogger(ChunkCombiner.class);
+
+    private SortedKeyValueIterator<Key,Value> source;
+    private SortedKeyValueIterator<Key,Value> refsSource;
+    private static final Collection<ByteSequence> refsColf =
+        Collections.singleton(FileDataIngest.REFS_CF_BS);
+    private Map<Text,byte[]> lastRowVC = Collections.emptyMap();
+
+    private Key topKey = null;
+    private Value topValue = null;
+
+    public ChunkCombiner() {}
+
+    @Override
+    public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options,
+        IteratorEnvironment env) throws IOException {
+      this.source = source;
+      this.refsSource = source.deepCopy(env);
+      log.info("DUDE call init. deepCopy on {}", source.getClass());
+    }
+
+    @Override
+    public boolean hasTop() {
+      return topKey != null;
+    }
+
+    @Override
+    public void next() throws IOException {
+      findTop();
+    }
+
+    @Override
+    public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive)
+        throws IOException {
+      source.seek(range, columnFamilies, inclusive);
+      findTop();
+    }
+
+    private void findTop() throws IOException {
+      do {
+        topKey = null;
+        topValue = null;
+      } while (source.hasTop() && _findTop() == null);
+    }
+
+    private byte[] _findTop() throws IOException {
+      long maxTS;
+
+      topKey = new Key(source.getTopKey());
+      topValue = new Value(source.getTopValue());
+      source.next();
+
+      if (!topKey.getColumnFamilyData().equals(FileDataIngest.CHUNK_CF_BS))
+        return topKey.getColumnVisibility().getBytes();
+
+      maxTS = topKey.getTimestamp();
+
+      while (source.hasTop() && source.getTopKey().equals(topKey, PartialKey.ROW_COLFAM_COLQUAL)) {
+        if (source.getTopKey().getTimestamp() > maxTS)
+          maxTS = source.getTopKey().getTimestamp();
+
+        if (!topValue.equals(source.getTopValue()))
+          throw new RuntimeException("values not equals " + topKey + " " + source.getTopKey()
+              + " : " + diffInfo(topValue, source.getTopValue()));
+
+        source.next();
+      }
+
+      byte[] vis = getVisFromRefs();
+      if (vis != null) {
+        topKey = new Key(topKey.getRowData().toArray(), topKey.getColumnFamilyData().toArray(),
+            topKey.getColumnQualifierData().toArray(), vis, maxTS);
+      }
+      return vis;
+    }
+
+    private byte[] getVisFromRefs() throws IOException {
+      Text row = topKey.getRow();
+      if (lastRowVC.containsKey(row))
+        return lastRowVC.get(row);
+      Range range = new Range(row);
+      refsSource.seek(range, refsColf, true);
+      VisibilityCombiner vc = null;
+      while (refsSource.hasTop()) {
+        if (vc == null)
+          vc = new VisibilityCombiner();
+        vc.add(refsSource.getTopKey().getColumnVisibilityData());
+        refsSource.next();
+      }
+      if (vc == null) {
+        lastRowVC = Collections.singletonMap(row, null);
+        return null;
+      }
+      lastRowVC = Collections.singletonMap(row, vc.get());
+      return vc.get();
+    }
+
+    private String diffInfo(Value v1, Value v2) {
+      if (v1.getSize() != v2.getSize()) {
+        return "val len not equal " + v1.getSize() + "!=" + v2.getSize();
+      }
+
+      byte[] vb1 = v1.get();
+      byte[] vb2 = v2.get();
+
+      for (int i = 0; i < vb1.length; i++) {
+        if (vb1[i] != vb2[i]) {
+          return String.format("first diff at offset %,d 0x%02x != 0x%02x", i, 0xff & vb1[i],
+              0xff & vb2[i]);
+        }
+      }
+
+      return null;
+    }
+
+    @Override
+    public Key getTopKey() {
+      return topKey;
+    }
+
+    @Override
+    public Value getTopValue() {
+      return topValue;
+    }
+
+    @Override
+    public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
+      ChunkCombiner cc = new ChunkCombiner();
+      try {
+        cc.init(source.deepCopy(env), null, env);
+      } catch (IOException e) {
+        throw new IllegalArgumentException(e);
+      }
+      return cc;
+    }
+  }
+
+  /**
+   * Copied from 2.0 Examples. A utility for merging visibilities into the form
+   * {@code (VIS1)|(VIS2)|...|(VISN)}. Used by the ChunkCombiner.
+   */
+  public static class VisibilityCombiner {
+
+    private TreeSet<String> visibilities = new TreeSet<>();
+
+    void add(ByteSequence cv) {
+      if (cv.length() == 0)
+        return;
+
+      int depth = 0;
+      int offset = 0;
+
+      for (int i = 0; i < cv.length(); i++) {
+        switch (cv.byteAt(i)) {
+          case '(':
+            depth++;
+            break;
+          case ')':
+            depth--;
+            if (depth < 0)
+              throw new IllegalArgumentException("Invalid vis " + cv);
+            break;
+          case '|':
+            if (depth == 0) {
+              insert(cv.subSequence(offset, i));
+              offset = i + 1;
+            }
+
+            break;
+        }
+      }
+
+      insert(cv.subSequence(offset, cv.length()));
+
+      if (depth != 0)
+        throw new IllegalArgumentException("Invalid vis " + cv);
+
+    }
+
+    private void insert(ByteSequence cv) {
+      for (int i = 0; i < cv.length(); i++) {
+
+      }
+
+      String cvs = cv.toString();
+
+      if (cvs.charAt(0) != '(')
+        cvs = "(" + cvs + ")";
+      else {
+        int depth = 0;
+        int depthZeroCloses = 0;
+        for (int i = 0; i < cv.length(); i++) {
+          switch (cv.byteAt(i)) {
+            case '(':
+              depth++;
+              break;
+            case ')':
+              depth--;
+              if (depth == 0)
+                depthZeroCloses++;
+              break;
+          }
+        }
+
+        if (depthZeroCloses > 1)
+          cvs = "(" + cvs + ")";
+      }
+
+      visibilities.add(cvs);
+    }
+
+    byte[] get() {
+      StringBuilder sb = new StringBuilder();
+      String sep = "";
+      for (String cvs : visibilities) {
+        sb.append(sep);
+        sep = "|";
+        sb.append(cvs);
+      }
+
+      return sb.toString().getBytes();
+    }
+  }
+
+  public static final byte[] nullbyte = new byte[] {0};
+
+  /**
+   * Join some number of strings using a null byte separator into a text object.
+   *
+   * @param s
+   *          strings
+   * @return a text object containing the strings separated by null bytes
+   */

Review comment:
       This could probably just be a one-line inline comment. We're not publishing javadocs for ITs. This method can probably also be private... as could some others added in this file for testing.

##########
File path: core/src/main/java/org/apache/accumulo/core/iterators/SortedMapIterator.java
##########
@@ -95,6 +111,9 @@ public void next() throws IOException {
   public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive)
       throws IOException {
 
+    if (interruptFlag != null && interruptFlag.get())

Review comment:
       `interruptFlag` should never be null. We can require non-null anywhere we accept it.

##########
File path: test/src/main/java/org/apache/accumulo/test/functional/IteratorMincClassCastBugIT.java
##########
@@ -0,0 +1,516 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.admin.NewTableConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.ArrayByteSequence;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.PartialKey;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.IteratorUtil;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+/**
+ * Tests iterator class hierarchy bug. See https://github.com/apache/accumulo/issues/2341
+ */
+public class IteratorMincClassCastBugIT extends AccumuloClusterHarness {
+
+  @Override
+  public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+    // this bug only shows up when not using native maps
+    cfg.setProperty(Property.TSERV_NATIVEMAP_ENABLED, "false");
+    cfg.setNumTservers(1);
+  }
+
+  @Override
+  protected int defaultTimeoutSeconds() {
+    return 60;
+  }
+
+  @Test
+  public void test() throws Exception {
+    try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) {
+
+      String tableName = getUniqueNames(1)[0];
+
+      NewTableConfiguration ntc = new NewTableConfiguration();
+      Map<String,Set<Text>> groups = new HashMap<>();
+      groups.put("g1", Set.of(new Text("~chunk")));
+      groups.put("g2", Set.of(new Text("refs")));
+      ntc.setLocalityGroups(groups);
+
+      IteratorSetting iteratorSetting = new IteratorSetting(20, ChunkCombiner.class);
+      ntc.attachIterator(iteratorSetting, EnumSet.of(IteratorUtil.IteratorScope.minc));
+
+      c.tableOperations().create(tableName, ntc);
+
+      int chunkSize = 64 * 1024;
+      ColumnVisibility visibility = new ColumnVisibility();
+
+      List<URL> files = new ArrayList<>();
+      files.add(getClass().getClassLoader().getResource("testfile1.md"));
+
+      try (BatchWriter bw = c.createBatchWriter(tableName, new BatchWriterConfig())) {
+        FileDataIngest fdi = new FileDataIngest(chunkSize, visibility);
+        for (URL filename : files) {
+          fdi.insertFileData(filename, bw);
+        }
+      }
+
+      c.tableOperations().flush(tableName, null, null, true);
+    }
+  }
+
+  /**
+   * Copied from 2.0 examples. Takes a list of files and archives them into Accumulo keyed on hashes
+   * of the files.
+   */
+  @SuppressFBWarnings(value = "WEAK_MESSAGE_DIGEST_MD5", justification = "For testing only")
+  public static class FileDataIngest {
+    public static final Text CHUNK_CF = new Text("~chunk");
+    public static final Text REFS_CF = new Text("refs");
+    public static final String REFS_ORIG_FILE = "name";
+    public static final String REFS_FILE_EXT = "filext";
+    public static final ByteSequence CHUNK_CF_BS =
+        new ArrayByteSequence(CHUNK_CF.getBytes(), 0, CHUNK_CF.getLength());
+    public static final ByteSequence REFS_CF_BS =
+        new ArrayByteSequence(REFS_CF.getBytes(), 0, REFS_CF.getLength());
+
+    int chunkSize;
+    byte[] chunkSizeBytes;
+    byte[] buf;
+    MessageDigest md5digest;
+    ColumnVisibility cv;
+
+    public FileDataIngest(int chunkSize, ColumnVisibility colvis) {
+      this.chunkSize = chunkSize;
+      chunkSizeBytes = intToBytes(chunkSize);
+      buf = new byte[chunkSize];
+      try {
+        md5digest = MessageDigest.getInstance("MD5");
+      } catch (NoSuchAlgorithmException e) {
+        throw new RuntimeException(e);
+      }
+      cv = colvis;
+    }
+
+    public String insertFileData(URL fileURL, BatchWriter bw)
+        throws MutationsRejectedException, IOException {
+      String filename = fileURL.getFile();
+      if (chunkSize == 0)
+        return "";
+      md5digest.reset();
+      String uid = hexString(md5digest.digest(filename.getBytes()));
+
+      // read through file once, calculating hashes
+      md5digest.reset();
+      InputStream fis = null;
+      int numRead = 0;
+      // try (var reader = new InputStreamReader(accumuloPropsLocation.openStream(), UTF_8)) {
+      try {
+        fis = new FileInputStream(filename);
+        numRead = fis.read(buf);
+        while (numRead >= 0) {
+          if (numRead > 0) {
+            md5digest.update(buf, 0, numRead);
+          }
+          numRead = fis.read(buf);
+        }
+      } finally {
+        if (fis != null) {
+          fis.close();
+        }
+      }
+
+      String hash = hexString(md5digest.digest());
+      Text row = new Text(hash);
+
+      // write info to accumulo
+      Mutation m = new Mutation(row);
+      m.put(REFS_CF, buildNullSepText(uid, REFS_ORIG_FILE), cv, new Value(filename.getBytes()));
+      String fext = getExt(filename);
+      if (fext != null)
+        m.put(REFS_CF, buildNullSepText(uid, REFS_FILE_EXT), cv, new Value(fext.getBytes()));
+      bw.addMutation(m);
+
+      // read through file again, writing chunks to accumulo
+      int chunkCount = 0;
+      try {
+        fis = new FileInputStream(filename);
+        numRead = fis.read(buf);
+        while (numRead >= 0) {
+          while (numRead < buf.length) {
+            int moreRead = fis.read(buf, numRead, buf.length - numRead);
+            if (moreRead > 0)
+              numRead += moreRead;
+            else if (moreRead < 0)
+              break;
+          }
+          m = new Mutation(row);
+          Text chunkCQ = new Text(chunkSizeBytes);
+          chunkCQ.append(intToBytes(chunkCount), 0, 4);
+          m.put(CHUNK_CF, chunkCQ, cv, new Value(buf, 0, numRead));
+          bw.addMutation(m);
+          if (chunkCount == Integer.MAX_VALUE)
+            throw new RuntimeException(
+                "too many chunks for file " + filename + ", try raising chunk size");
+          chunkCount++;
+          numRead = fis.read(buf);
+        }
+      } finally {
+        if (fis != null) {
+          fis.close();
+        }
+      }
+      m = new Mutation(row);
+      Text chunkCQ = new Text(chunkSizeBytes);
+      chunkCQ.append(intToBytes(chunkCount), 0, 4);
+      m.put(new Text(CHUNK_CF), chunkCQ, cv, new Value(new byte[0]));
+      bw.addMutation(m);
+      return hash;
+    }
+
+    public static byte[] intToBytes(int l) {
+      byte[] b = new byte[4];
+      b[0] = (byte) (l >>> 24);
+      b[1] = (byte) (l >>> 16);
+      b[2] = (byte) (l >>> 8);
+      b[3] = (byte) (l >>> 0);
+      return b;
+    }
+
+    private static String getExt(String filename) {
+      if (filename.indexOf(".") == -1)
+        return null;
+      return filename.substring(filename.lastIndexOf(".") + 1);
+    }
+
+    public String hexString(byte[] bytes) {
+      StringBuilder sb = new StringBuilder();
+      for (byte b : bytes) {
+        sb.append(String.format("%02x", b));
+      }
+      return sb.toString();
+    }
+  }
+
+  /**
+   * This iterator dedupes chunks and sets their visibilities to the combined visibility of the refs
+   * columns. For example, it would combine
+   *
+   * <pre>
+   *    row1 refs uid1\0a A&amp;B V0
+   *    row1 refs uid2\0b C&amp;D V0
+   *    row1 ~chunk 0 A&amp;B V1
+   *    row1 ~chunk 0 C&amp;D V1
+   *    row1 ~chunk 0 E&amp;F V1
+   *    row1 ~chunk 0 G&amp;H V1
+   * </pre>
+   *
+   * into the following
+   *
+   * <pre>
+   *    row1 refs uid1\0a A&amp;B V0
+   *    row1 refs uid2\0b C&amp;D V0
+   *    row1 ~chunk 0 (A&amp;B)|(C&amp;D) V1
+   * </pre>
+   *
+   * {@link VisibilityCombiner} is used to combie the visibilities.
+   */
+  public static class ChunkCombiner implements SortedKeyValueIterator<Key,Value> {
+    private static final Logger log = LoggerFactory.getLogger(ChunkCombiner.class);
+
+    private SortedKeyValueIterator<Key,Value> source;
+    private SortedKeyValueIterator<Key,Value> refsSource;
+    private static final Collection<ByteSequence> refsColf =
+        Collections.singleton(FileDataIngest.REFS_CF_BS);
+    private Map<Text,byte[]> lastRowVC = Collections.emptyMap();
+
+    private Key topKey = null;
+    private Value topValue = null;
+
+    public ChunkCombiner() {}
+
+    @Override
+    public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options,
+        IteratorEnvironment env) throws IOException {
+      this.source = source;
+      this.refsSource = source.deepCopy(env);
+      log.info("DUDE call init. deepCopy on {}", source.getClass());
+    }
+
+    @Override
+    public boolean hasTop() {
+      return topKey != null;
+    }
+
+    @Override
+    public void next() throws IOException {
+      findTop();
+    }
+
+    @Override
+    public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive)
+        throws IOException {
+      source.seek(range, columnFamilies, inclusive);
+      findTop();
+    }
+
+    private void findTop() throws IOException {
+      do {
+        topKey = null;
+        topValue = null;
+      } while (source.hasTop() && _findTop() == null);
+    }
+
+    private byte[] _findTop() throws IOException {
+      long maxTS;
+
+      topKey = new Key(source.getTopKey());
+      topValue = new Value(source.getTopValue());
+      source.next();
+
+      if (!topKey.getColumnFamilyData().equals(FileDataIngest.CHUNK_CF_BS))
+        return topKey.getColumnVisibility().getBytes();
+
+      maxTS = topKey.getTimestamp();
+
+      while (source.hasTop() && source.getTopKey().equals(topKey, PartialKey.ROW_COLFAM_COLQUAL)) {
+        if (source.getTopKey().getTimestamp() > maxTS)
+          maxTS = source.getTopKey().getTimestamp();
+
+        if (!topValue.equals(source.getTopValue()))
+          throw new RuntimeException("values not equals " + topKey + " " + source.getTopKey()
+              + " : " + diffInfo(topValue, source.getTopValue()));
+
+        source.next();
+      }
+
+      byte[] vis = getVisFromRefs();
+      if (vis != null) {
+        topKey = new Key(topKey.getRowData().toArray(), topKey.getColumnFamilyData().toArray(),
+            topKey.getColumnQualifierData().toArray(), vis, maxTS);
+      }
+      return vis;
+    }
+
+    private byte[] getVisFromRefs() throws IOException {
+      Text row = topKey.getRow();
+      if (lastRowVC.containsKey(row))
+        return lastRowVC.get(row);
+      Range range = new Range(row);
+      refsSource.seek(range, refsColf, true);
+      VisibilityCombiner vc = null;
+      while (refsSource.hasTop()) {
+        if (vc == null)
+          vc = new VisibilityCombiner();
+        vc.add(refsSource.getTopKey().getColumnVisibilityData());
+        refsSource.next();
+      }
+      if (vc == null) {
+        lastRowVC = Collections.singletonMap(row, null);
+        return null;
+      }
+      lastRowVC = Collections.singletonMap(row, vc.get());
+      return vc.get();
+    }
+
+    private String diffInfo(Value v1, Value v2) {
+      if (v1.getSize() != v2.getSize()) {
+        return "val len not equal " + v1.getSize() + "!=" + v2.getSize();
+      }
+
+      byte[] vb1 = v1.get();
+      byte[] vb2 = v2.get();
+
+      for (int i = 0; i < vb1.length; i++) {
+        if (vb1[i] != vb2[i]) {
+          return String.format("first diff at offset %,d 0x%02x != 0x%02x", i, 0xff & vb1[i],
+              0xff & vb2[i]);
+        }
+      }
+
+      return null;
+    }
+
+    @Override
+    public Key getTopKey() {
+      return topKey;
+    }
+
+    @Override
+    public Value getTopValue() {
+      return topValue;
+    }
+
+    @Override
+    public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
+      ChunkCombiner cc = new ChunkCombiner();
+      try {
+        cc.init(source.deepCopy(env), null, env);
+      } catch (IOException e) {
+        throw new IllegalArgumentException(e);
+      }
+      return cc;
+    }
+  }
+
+  /**
+   * Copied from 2.0 Examples. A utility for merging visibilities into the form
+   * {@code (VIS1)|(VIS2)|...|(VISN)}. Used by the ChunkCombiner.
+   */
+  public static class VisibilityCombiner {
+
+    private TreeSet<String> visibilities = new TreeSet<>();
+
+    void add(ByteSequence cv) {
+      if (cv.length() == 0)
+        return;
+
+      int depth = 0;
+      int offset = 0;
+
+      for (int i = 0; i < cv.length(); i++) {
+        switch (cv.byteAt(i)) {
+          case '(':
+            depth++;
+            break;
+          case ')':
+            depth--;
+            if (depth < 0)
+              throw new IllegalArgumentException("Invalid vis " + cv);
+            break;
+          case '|':
+            if (depth == 0) {
+              insert(cv.subSequence(offset, i));
+              offset = i + 1;
+            }
+
+            break;
+        }
+      }
+
+      insert(cv.subSequence(offset, cv.length()));
+
+      if (depth != 0)
+        throw new IllegalArgumentException("Invalid vis " + cv);
+
+    }
+
+    private void insert(ByteSequence cv) {
+      for (int i = 0; i < cv.length(); i++) {
+
+      }

Review comment:
       What is this loop doing?

##########
File path: core/src/main/java/org/apache/accumulo/core/iterators/ColumnFamilySkippingIterator.java
##########
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.accumulo.core.iteratorsImpl.system;
+package org.apache.accumulo.core.iterators;

Review comment:
       Moving the interface back to public API seems like the obvious fix, but are we expecting users to actually use this interface? I haven't followed all the details, but my understanding was that this was a feature intended solely for internal use with some of our own system iterators. But, maybe I'm wrong? Is this something that is useful to user iterators and safe for them to use?

##########
File path: test/src/main/java/org/apache/accumulo/test/functional/IteratorMincClassCastBugIT.java
##########
@@ -0,0 +1,516 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.admin.NewTableConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.ArrayByteSequence;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.PartialKey;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.IteratorUtil;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+/**
+ * Tests iterator class hierarchy bug. See https://github.com/apache/accumulo/issues/2341
+ */
+public class IteratorMincClassCastBugIT extends AccumuloClusterHarness {
+
+  @Override
+  public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+    // this bug only shows up when not using native maps
+    cfg.setProperty(Property.TSERV_NATIVEMAP_ENABLED, "false");
+    cfg.setNumTservers(1);
+  }
+
+  @Override
+  protected int defaultTimeoutSeconds() {
+    return 60;
+  }
+
+  @Test
+  public void test() throws Exception {
+    try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) {
+
+      String tableName = getUniqueNames(1)[0];
+
+      NewTableConfiguration ntc = new NewTableConfiguration();
+      Map<String,Set<Text>> groups = new HashMap<>();
+      groups.put("g1", Set.of(new Text("~chunk")));
+      groups.put("g2", Set.of(new Text("refs")));
+      ntc.setLocalityGroups(groups);
+
+      IteratorSetting iteratorSetting = new IteratorSetting(20, ChunkCombiner.class);
+      ntc.attachIterator(iteratorSetting, EnumSet.of(IteratorUtil.IteratorScope.minc));
+
+      c.tableOperations().create(tableName, ntc);
+
+      int chunkSize = 64 * 1024;
+      ColumnVisibility visibility = new ColumnVisibility();
+
+      List<URL> files = new ArrayList<>();
+      files.add(getClass().getClassLoader().getResource("testfile1.md"));
+
+      try (BatchWriter bw = c.createBatchWriter(tableName, new BatchWriterConfig())) {
+        FileDataIngest fdi = new FileDataIngest(chunkSize, visibility);
+        for (URL filename : files) {
+          fdi.insertFileData(filename, bw);
+        }
+      }
+
+      c.tableOperations().flush(tableName, null, null, true);
+    }
+  }
+
+  /**
+   * Copied from 2.0 examples. Takes a list of files and archives them into Accumulo keyed on hashes
+   * of the files.
+   */
+  @SuppressFBWarnings(value = "WEAK_MESSAGE_DIGEST_MD5", justification = "For testing only")
+  public static class FileDataIngest {
+    public static final Text CHUNK_CF = new Text("~chunk");
+    public static final Text REFS_CF = new Text("refs");
+    public static final String REFS_ORIG_FILE = "name";
+    public static final String REFS_FILE_EXT = "filext";
+    public static final ByteSequence CHUNK_CF_BS =
+        new ArrayByteSequence(CHUNK_CF.getBytes(), 0, CHUNK_CF.getLength());
+    public static final ByteSequence REFS_CF_BS =
+        new ArrayByteSequence(REFS_CF.getBytes(), 0, REFS_CF.getLength());
+
+    int chunkSize;
+    byte[] chunkSizeBytes;
+    byte[] buf;
+    MessageDigest md5digest;
+    ColumnVisibility cv;
+
+    public FileDataIngest(int chunkSize, ColumnVisibility colvis) {
+      this.chunkSize = chunkSize;
+      chunkSizeBytes = intToBytes(chunkSize);
+      buf = new byte[chunkSize];
+      try {
+        md5digest = MessageDigest.getInstance("MD5");
+      } catch (NoSuchAlgorithmException e) {
+        throw new RuntimeException(e);
+      }
+      cv = colvis;
+    }
+
+    public String insertFileData(URL fileURL, BatchWriter bw)
+        throws MutationsRejectedException, IOException {
+      String filename = fileURL.getFile();
+      if (chunkSize == 0)
+        return "";
+      md5digest.reset();
+      String uid = hexString(md5digest.digest(filename.getBytes()));
+
+      // read through file once, calculating hashes
+      md5digest.reset();
+      InputStream fis = null;
+      int numRead = 0;
+      // try (var reader = new InputStreamReader(accumuloPropsLocation.openStream(), UTF_8)) {
+      try {
+        fis = new FileInputStream(filename);
+        numRead = fis.read(buf);
+        while (numRead >= 0) {
+          if (numRead > 0) {
+            md5digest.update(buf, 0, numRead);
+          }
+          numRead = fis.read(buf);
+        }
+      } finally {
+        if (fis != null) {
+          fis.close();
+        }
+      }
+
+      String hash = hexString(md5digest.digest());
+      Text row = new Text(hash);
+
+      // write info to accumulo
+      Mutation m = new Mutation(row);
+      m.put(REFS_CF, buildNullSepText(uid, REFS_ORIG_FILE), cv, new Value(filename.getBytes()));
+      String fext = getExt(filename);
+      if (fext != null)
+        m.put(REFS_CF, buildNullSepText(uid, REFS_FILE_EXT), cv, new Value(fext.getBytes()));
+      bw.addMutation(m);
+
+      // read through file again, writing chunks to accumulo
+      int chunkCount = 0;
+      try {
+        fis = new FileInputStream(filename);
+        numRead = fis.read(buf);
+        while (numRead >= 0) {
+          while (numRead < buf.length) {
+            int moreRead = fis.read(buf, numRead, buf.length - numRead);
+            if (moreRead > 0)
+              numRead += moreRead;
+            else if (moreRead < 0)
+              break;
+          }
+          m = new Mutation(row);
+          Text chunkCQ = new Text(chunkSizeBytes);
+          chunkCQ.append(intToBytes(chunkCount), 0, 4);
+          m.put(CHUNK_CF, chunkCQ, cv, new Value(buf, 0, numRead));
+          bw.addMutation(m);
+          if (chunkCount == Integer.MAX_VALUE)
+            throw new RuntimeException(
+                "too many chunks for file " + filename + ", try raising chunk size");
+          chunkCount++;
+          numRead = fis.read(buf);
+        }
+      } finally {
+        if (fis != null) {
+          fis.close();
+        }
+      }
+      m = new Mutation(row);
+      Text chunkCQ = new Text(chunkSizeBytes);
+      chunkCQ.append(intToBytes(chunkCount), 0, 4);
+      m.put(new Text(CHUNK_CF), chunkCQ, cv, new Value(new byte[0]));
+      bw.addMutation(m);
+      return hash;
+    }
+
+    public static byte[] intToBytes(int l) {
+      byte[] b = new byte[4];
+      b[0] = (byte) (l >>> 24);
+      b[1] = (byte) (l >>> 16);
+      b[2] = (byte) (l >>> 8);
+      b[3] = (byte) (l >>> 0);
+      return b;
+    }
+
+    private static String getExt(String filename) {
+      if (filename.indexOf(".") == -1)
+        return null;
+      return filename.substring(filename.lastIndexOf(".") + 1);
+    }
+
+    public String hexString(byte[] bytes) {
+      StringBuilder sb = new StringBuilder();
+      for (byte b : bytes) {
+        sb.append(String.format("%02x", b));
+      }
+      return sb.toString();
+    }
+  }
+
+  /**
+   * This iterator dedupes chunks and sets their visibilities to the combined visibility of the refs
+   * columns. For example, it would combine
+   *
+   * <pre>
+   *    row1 refs uid1\0a A&amp;B V0
+   *    row1 refs uid2\0b C&amp;D V0
+   *    row1 ~chunk 0 A&amp;B V1
+   *    row1 ~chunk 0 C&amp;D V1
+   *    row1 ~chunk 0 E&amp;F V1
+   *    row1 ~chunk 0 G&amp;H V1
+   * </pre>
+   *
+   * into the following
+   *
+   * <pre>
+   *    row1 refs uid1\0a A&amp;B V0
+   *    row1 refs uid2\0b C&amp;D V0
+   *    row1 ~chunk 0 (A&amp;B)|(C&amp;D) V1
+   * </pre>
+   *
+   * {@link VisibilityCombiner} is used to combie the visibilities.
+   */
+  public static class ChunkCombiner implements SortedKeyValueIterator<Key,Value> {
+    private static final Logger log = LoggerFactory.getLogger(ChunkCombiner.class);
+
+    private SortedKeyValueIterator<Key,Value> source;
+    private SortedKeyValueIterator<Key,Value> refsSource;
+    private static final Collection<ByteSequence> refsColf =
+        Collections.singleton(FileDataIngest.REFS_CF_BS);
+    private Map<Text,byte[]> lastRowVC = Collections.emptyMap();
+
+    private Key topKey = null;
+    private Value topValue = null;
+
+    public ChunkCombiner() {}
+
+    @Override
+    public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options,
+        IteratorEnvironment env) throws IOException {
+      this.source = source;
+      this.refsSource = source.deepCopy(env);
+      log.info("DUDE call init. deepCopy on {}", source.getClass());
+    }
+
+    @Override
+    public boolean hasTop() {
+      return topKey != null;
+    }
+
+    @Override
+    public void next() throws IOException {
+      findTop();
+    }
+
+    @Override
+    public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive)
+        throws IOException {
+      source.seek(range, columnFamilies, inclusive);
+      findTop();
+    }
+
+    private void findTop() throws IOException {
+      do {
+        topKey = null;
+        topValue = null;
+      } while (source.hasTop() && _findTop() == null);
+    }
+
+    private byte[] _findTop() throws IOException {
+      long maxTS;
+
+      topKey = new Key(source.getTopKey());
+      topValue = new Value(source.getTopValue());
+      source.next();
+
+      if (!topKey.getColumnFamilyData().equals(FileDataIngest.CHUNK_CF_BS))
+        return topKey.getColumnVisibility().getBytes();
+
+      maxTS = topKey.getTimestamp();
+
+      while (source.hasTop() && source.getTopKey().equals(topKey, PartialKey.ROW_COLFAM_COLQUAL)) {
+        if (source.getTopKey().getTimestamp() > maxTS)
+          maxTS = source.getTopKey().getTimestamp();
+
+        if (!topValue.equals(source.getTopValue()))
+          throw new RuntimeException("values not equals " + topKey + " " + source.getTopKey()
+              + " : " + diffInfo(topValue, source.getTopValue()));
+
+        source.next();
+      }
+
+      byte[] vis = getVisFromRefs();
+      if (vis != null) {
+        topKey = new Key(topKey.getRowData().toArray(), topKey.getColumnFamilyData().toArray(),
+            topKey.getColumnQualifierData().toArray(), vis, maxTS);
+      }
+      return vis;
+    }
+
+    private byte[] getVisFromRefs() throws IOException {
+      Text row = topKey.getRow();
+      if (lastRowVC.containsKey(row))
+        return lastRowVC.get(row);
+      Range range = new Range(row);
+      refsSource.seek(range, refsColf, true);
+      VisibilityCombiner vc = null;
+      while (refsSource.hasTop()) {
+        if (vc == null)
+          vc = new VisibilityCombiner();
+        vc.add(refsSource.getTopKey().getColumnVisibilityData());
+        refsSource.next();
+      }
+      if (vc == null) {
+        lastRowVC = Collections.singletonMap(row, null);
+        return null;
+      }
+      lastRowVC = Collections.singletonMap(row, vc.get());
+      return vc.get();
+    }
+
+    private String diffInfo(Value v1, Value v2) {
+      if (v1.getSize() != v2.getSize()) {
+        return "val len not equal " + v1.getSize() + "!=" + v2.getSize();
+      }
+
+      byte[] vb1 = v1.get();
+      byte[] vb2 = v2.get();
+
+      for (int i = 0; i < vb1.length; i++) {
+        if (vb1[i] != vb2[i]) {
+          return String.format("first diff at offset %,d 0x%02x != 0x%02x", i, 0xff & vb1[i],
+              0xff & vb2[i]);
+        }
+      }
+
+      return null;
+    }
+
+    @Override
+    public Key getTopKey() {
+      return topKey;
+    }
+
+    @Override
+    public Value getTopValue() {
+      return topValue;
+    }
+
+    @Override
+    public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
+      ChunkCombiner cc = new ChunkCombiner();
+      try {
+        cc.init(source.deepCopy(env), null, env);
+      } catch (IOException e) {
+        throw new IllegalArgumentException(e);
+      }
+      return cc;
+    }
+  }
+
+  /**
+   * Copied from 2.0 Examples. A utility for merging visibilities into the form
+   * {@code (VIS1)|(VIS2)|...|(VISN)}. Used by the ChunkCombiner.
+   */
+  public static class VisibilityCombiner {
+
+    private TreeSet<String> visibilities = new TreeSet<>();
+
+    void add(ByteSequence cv) {
+      if (cv.length() == 0)
+        return;
+
+      int depth = 0;
+      int offset = 0;
+
+      for (int i = 0; i < cv.length(); i++) {
+        switch (cv.byteAt(i)) {
+          case '(':
+            depth++;
+            break;
+          case ')':
+            depth--;
+            if (depth < 0)
+              throw new IllegalArgumentException("Invalid vis " + cv);
+            break;
+          case '|':
+            if (depth == 0) {
+              insert(cv.subSequence(offset, i));
+              offset = i + 1;
+            }
+
+            break;
+        }
+      }
+
+      insert(cv.subSequence(offset, cv.length()));
+
+      if (depth != 0)
+        throw new IllegalArgumentException("Invalid vis " + cv);
+
+    }
+
+    private void insert(ByteSequence cv) {
+      for (int i = 0; i < cv.length(); i++) {
+
+      }
+
+      String cvs = cv.toString();
+
+      if (cvs.charAt(0) != '(')
+        cvs = "(" + cvs + ")";
+      else {
+        int depth = 0;
+        int depthZeroCloses = 0;
+        for (int i = 0; i < cv.length(); i++) {
+          switch (cv.byteAt(i)) {
+            case '(':
+              depth++;
+              break;
+            case ')':
+              depth--;
+              if (depth == 0)
+                depthZeroCloses++;
+              break;
+          }
+        }
+
+        if (depthZeroCloses > 1)
+          cvs = "(" + cvs + ")";
+      }
+
+      visibilities.add(cvs);
+    }
+
+    byte[] get() {
+      StringBuilder sb = new StringBuilder();
+      String sep = "";
+      for (String cvs : visibilities) {
+        sb.append(sep);
+        sep = "|";
+        sb.append(cvs);
+      }
+
+      return sb.toString().getBytes();
+    }
+  }
+
+  public static final byte[] nullbyte = new byte[] {0};
+
+  /**
+   * Join some number of strings using a null byte separator into a text object.
+   *
+   * @param s
+   *          strings
+   * @return a text object containing the strings separated by null bytes
+   */
+  public static Text buildNullSepText(String... s) {
+    Text t = new Text(s[0]);
+    for (int i = 1; i < s.length; i++) {
+      t.append(nullbyte, 0, 1);
+      t.append(s[i].getBytes(), 0, s[i].length());

Review comment:
       should be `getBytes(UTF_8)`

##########
File path: test/src/main/java/org/apache/accumulo/test/functional/IteratorMincClassCastBugIT.java
##########
@@ -0,0 +1,516 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.admin.NewTableConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.ArrayByteSequence;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.PartialKey;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.IteratorUtil;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+/**
+ * Tests iterator class hierarchy bug. See https://github.com/apache/accumulo/issues/2341
+ */
+public class IteratorMincClassCastBugIT extends AccumuloClusterHarness {
+
+  @Override
+  public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+    // this bug only shows up when not using native maps
+    cfg.setProperty(Property.TSERV_NATIVEMAP_ENABLED, "false");
+    cfg.setNumTservers(1);
+  }
+
+  @Override
+  protected int defaultTimeoutSeconds() {
+    return 60;
+  }
+
+  @Test
+  public void test() throws Exception {
+    try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) {
+
+      String tableName = getUniqueNames(1)[0];
+
+      NewTableConfiguration ntc = new NewTableConfiguration();
+      Map<String,Set<Text>> groups = new HashMap<>();
+      groups.put("g1", Set.of(new Text("~chunk")));
+      groups.put("g2", Set.of(new Text("refs")));
+      ntc.setLocalityGroups(groups);
+
+      IteratorSetting iteratorSetting = new IteratorSetting(20, ChunkCombiner.class);
+      ntc.attachIterator(iteratorSetting, EnumSet.of(IteratorUtil.IteratorScope.minc));
+
+      c.tableOperations().create(tableName, ntc);
+
+      int chunkSize = 64 * 1024;
+      ColumnVisibility visibility = new ColumnVisibility();
+
+      List<URL> files = new ArrayList<>();
+      files.add(getClass().getClassLoader().getResource("testfile1.md"));
+
+      try (BatchWriter bw = c.createBatchWriter(tableName, new BatchWriterConfig())) {
+        FileDataIngest fdi = new FileDataIngest(chunkSize, visibility);
+        for (URL filename : files) {
+          fdi.insertFileData(filename, bw);
+        }
+      }
+
+      c.tableOperations().flush(tableName, null, null, true);
+    }
+  }
+
+  /**
+   * Copied from 2.0 examples. Takes a list of files and archives them into Accumulo keyed on hashes
+   * of the files.
+   */
+  @SuppressFBWarnings(value = "WEAK_MESSAGE_DIGEST_MD5", justification = "For testing only")
+  public static class FileDataIngest {
+    public static final Text CHUNK_CF = new Text("~chunk");
+    public static final Text REFS_CF = new Text("refs");
+    public static final String REFS_ORIG_FILE = "name";
+    public static final String REFS_FILE_EXT = "filext";
+    public static final ByteSequence CHUNK_CF_BS =
+        new ArrayByteSequence(CHUNK_CF.getBytes(), 0, CHUNK_CF.getLength());
+    public static final ByteSequence REFS_CF_BS =
+        new ArrayByteSequence(REFS_CF.getBytes(), 0, REFS_CF.getLength());
+
+    int chunkSize;
+    byte[] chunkSizeBytes;
+    byte[] buf;
+    MessageDigest md5digest;
+    ColumnVisibility cv;
+
+    public FileDataIngest(int chunkSize, ColumnVisibility colvis) {
+      this.chunkSize = chunkSize;
+      chunkSizeBytes = intToBytes(chunkSize);
+      buf = new byte[chunkSize];
+      try {
+        md5digest = MessageDigest.getInstance("MD5");
+      } catch (NoSuchAlgorithmException e) {
+        throw new RuntimeException(e);
+      }
+      cv = colvis;
+    }
+
+    public String insertFileData(URL fileURL, BatchWriter bw)
+        throws MutationsRejectedException, IOException {
+      String filename = fileURL.getFile();
+      if (chunkSize == 0)
+        return "";
+      md5digest.reset();
+      String uid = hexString(md5digest.digest(filename.getBytes()));
+
+      // read through file once, calculating hashes
+      md5digest.reset();
+      InputStream fis = null;
+      int numRead = 0;
+      // try (var reader = new InputStreamReader(accumuloPropsLocation.openStream(), UTF_8)) {
+      try {
+        fis = new FileInputStream(filename);
+        numRead = fis.read(buf);
+        while (numRead >= 0) {
+          if (numRead > 0) {
+            md5digest.update(buf, 0, numRead);
+          }
+          numRead = fis.read(buf);
+        }
+      } finally {
+        if (fis != null) {
+          fis.close();
+        }
+      }
+
+      String hash = hexString(md5digest.digest());
+      Text row = new Text(hash);
+
+      // write info to accumulo
+      Mutation m = new Mutation(row);
+      m.put(REFS_CF, buildNullSepText(uid, REFS_ORIG_FILE), cv, new Value(filename.getBytes()));
+      String fext = getExt(filename);
+      if (fext != null)
+        m.put(REFS_CF, buildNullSepText(uid, REFS_FILE_EXT), cv, new Value(fext.getBytes()));
+      bw.addMutation(m);
+
+      // read through file again, writing chunks to accumulo
+      int chunkCount = 0;
+      try {
+        fis = new FileInputStream(filename);
+        numRead = fis.read(buf);
+        while (numRead >= 0) {
+          while (numRead < buf.length) {
+            int moreRead = fis.read(buf, numRead, buf.length - numRead);
+            if (moreRead > 0)
+              numRead += moreRead;
+            else if (moreRead < 0)
+              break;
+          }
+          m = new Mutation(row);
+          Text chunkCQ = new Text(chunkSizeBytes);
+          chunkCQ.append(intToBytes(chunkCount), 0, 4);
+          m.put(CHUNK_CF, chunkCQ, cv, new Value(buf, 0, numRead));
+          bw.addMutation(m);
+          if (chunkCount == Integer.MAX_VALUE)
+            throw new RuntimeException(
+                "too many chunks for file " + filename + ", try raising chunk size");
+          chunkCount++;
+          numRead = fis.read(buf);
+        }
+      } finally {
+        if (fis != null) {
+          fis.close();
+        }
+      }
+      m = new Mutation(row);
+      Text chunkCQ = new Text(chunkSizeBytes);
+      chunkCQ.append(intToBytes(chunkCount), 0, 4);
+      m.put(new Text(CHUNK_CF), chunkCQ, cv, new Value(new byte[0]));
+      bw.addMutation(m);
+      return hash;
+    }
+
+    public static byte[] intToBytes(int l) {
+      byte[] b = new byte[4];
+      b[0] = (byte) (l >>> 24);
+      b[1] = (byte) (l >>> 16);
+      b[2] = (byte) (l >>> 8);
+      b[3] = (byte) (l >>> 0);
+      return b;
+    }
+
+    private static String getExt(String filename) {
+      if (filename.indexOf(".") == -1)
+        return null;
+      return filename.substring(filename.lastIndexOf(".") + 1);
+    }
+
+    public String hexString(byte[] bytes) {
+      StringBuilder sb = new StringBuilder();
+      for (byte b : bytes) {
+        sb.append(String.format("%02x", b));
+      }
+      return sb.toString();
+    }
+  }
+
+  /**
+   * This iterator dedupes chunks and sets their visibilities to the combined visibility of the refs
+   * columns. For example, it would combine
+   *
+   * <pre>
+   *    row1 refs uid1\0a A&amp;B V0
+   *    row1 refs uid2\0b C&amp;D V0
+   *    row1 ~chunk 0 A&amp;B V1
+   *    row1 ~chunk 0 C&amp;D V1
+   *    row1 ~chunk 0 E&amp;F V1
+   *    row1 ~chunk 0 G&amp;H V1
+   * </pre>
+   *
+   * into the following
+   *
+   * <pre>
+   *    row1 refs uid1\0a A&amp;B V0
+   *    row1 refs uid2\0b C&amp;D V0
+   *    row1 ~chunk 0 (A&amp;B)|(C&amp;D) V1
+   * </pre>
+   *
+   * {@link VisibilityCombiner} is used to combie the visibilities.
+   */
+  public static class ChunkCombiner implements SortedKeyValueIterator<Key,Value> {
+    private static final Logger log = LoggerFactory.getLogger(ChunkCombiner.class);
+
+    private SortedKeyValueIterator<Key,Value> source;
+    private SortedKeyValueIterator<Key,Value> refsSource;
+    private static final Collection<ByteSequence> refsColf =
+        Collections.singleton(FileDataIngest.REFS_CF_BS);
+    private Map<Text,byte[]> lastRowVC = Collections.emptyMap();
+
+    private Key topKey = null;
+    private Value topValue = null;
+
+    public ChunkCombiner() {}
+
+    @Override
+    public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options,
+        IteratorEnvironment env) throws IOException {
+      this.source = source;
+      this.refsSource = source.deepCopy(env);
+      log.info("DUDE call init. deepCopy on {}", source.getClass());
+    }
+
+    @Override
+    public boolean hasTop() {
+      return topKey != null;
+    }
+
+    @Override
+    public void next() throws IOException {
+      findTop();
+    }
+
+    @Override
+    public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive)
+        throws IOException {
+      source.seek(range, columnFamilies, inclusive);
+      findTop();
+    }
+
+    private void findTop() throws IOException {
+      do {
+        topKey = null;
+        topValue = null;
+      } while (source.hasTop() && _findTop() == null);
+    }
+
+    private byte[] _findTop() throws IOException {
+      long maxTS;
+
+      topKey = new Key(source.getTopKey());
+      topValue = new Value(source.getTopValue());
+      source.next();
+
+      if (!topKey.getColumnFamilyData().equals(FileDataIngest.CHUNK_CF_BS))
+        return topKey.getColumnVisibility().getBytes();
+
+      maxTS = topKey.getTimestamp();
+
+      while (source.hasTop() && source.getTopKey().equals(topKey, PartialKey.ROW_COLFAM_COLQUAL)) {
+        if (source.getTopKey().getTimestamp() > maxTS)
+          maxTS = source.getTopKey().getTimestamp();
+
+        if (!topValue.equals(source.getTopValue()))
+          throw new RuntimeException("values not equals " + topKey + " " + source.getTopKey()
+              + " : " + diffInfo(topValue, source.getTopValue()));
+
+        source.next();
+      }
+
+      byte[] vis = getVisFromRefs();
+      if (vis != null) {
+        topKey = new Key(topKey.getRowData().toArray(), topKey.getColumnFamilyData().toArray(),
+            topKey.getColumnQualifierData().toArray(), vis, maxTS);
+      }
+      return vis;
+    }
+
+    private byte[] getVisFromRefs() throws IOException {
+      Text row = topKey.getRow();
+      if (lastRowVC.containsKey(row))
+        return lastRowVC.get(row);
+      Range range = new Range(row);
+      refsSource.seek(range, refsColf, true);
+      VisibilityCombiner vc = null;
+      while (refsSource.hasTop()) {
+        if (vc == null)
+          vc = new VisibilityCombiner();
+        vc.add(refsSource.getTopKey().getColumnVisibilityData());
+        refsSource.next();
+      }
+      if (vc == null) {
+        lastRowVC = Collections.singletonMap(row, null);
+        return null;
+      }
+      lastRowVC = Collections.singletonMap(row, vc.get());
+      return vc.get();
+    }
+
+    private String diffInfo(Value v1, Value v2) {
+      if (v1.getSize() != v2.getSize()) {
+        return "val len not equal " + v1.getSize() + "!=" + v2.getSize();
+      }
+
+      byte[] vb1 = v1.get();
+      byte[] vb2 = v2.get();
+
+      for (int i = 0; i < vb1.length; i++) {
+        if (vb1[i] != vb2[i]) {
+          return String.format("first diff at offset %,d 0x%02x != 0x%02x", i, 0xff & vb1[i],
+              0xff & vb2[i]);
+        }
+      }
+
+      return null;
+    }
+
+    @Override
+    public Key getTopKey() {
+      return topKey;
+    }
+
+    @Override
+    public Value getTopValue() {
+      return topValue;
+    }
+
+    @Override
+    public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
+      ChunkCombiner cc = new ChunkCombiner();
+      try {
+        cc.init(source.deepCopy(env), null, env);
+      } catch (IOException e) {
+        throw new IllegalArgumentException(e);
+      }
+      return cc;
+    }
+  }
+
+  /**
+   * Copied from 2.0 Examples. A utility for merging visibilities into the form
+   * {@code (VIS1)|(VIS2)|...|(VISN)}. Used by the ChunkCombiner.
+   */
+  public static class VisibilityCombiner {
+
+    private TreeSet<String> visibilities = new TreeSet<>();
+
+    void add(ByteSequence cv) {
+      if (cv.length() == 0)
+        return;
+
+      int depth = 0;
+      int offset = 0;
+
+      for (int i = 0; i < cv.length(); i++) {
+        switch (cv.byteAt(i)) {
+          case '(':
+            depth++;
+            break;
+          case ')':
+            depth--;
+            if (depth < 0)
+              throw new IllegalArgumentException("Invalid vis " + cv);
+            break;
+          case '|':
+            if (depth == 0) {
+              insert(cv.subSequence(offset, i));
+              offset = i + 1;
+            }
+
+            break;
+        }
+      }
+
+      insert(cv.subSequence(offset, cv.length()));
+
+      if (depth != 0)
+        throw new IllegalArgumentException("Invalid vis " + cv);
+
+    }
+
+    private void insert(ByteSequence cv) {
+      for (int i = 0; i < cv.length(); i++) {
+
+      }
+
+      String cvs = cv.toString();
+
+      if (cvs.charAt(0) != '(')
+        cvs = "(" + cvs + ")";
+      else {
+        int depth = 0;
+        int depthZeroCloses = 0;
+        for (int i = 0; i < cv.length(); i++) {
+          switch (cv.byteAt(i)) {
+            case '(':
+              depth++;
+              break;
+            case ')':
+              depth--;
+              if (depth == 0)
+                depthZeroCloses++;
+              break;
+          }
+        }
+
+        if (depthZeroCloses > 1)
+          cvs = "(" + cvs + ")";
+      }
+
+      visibilities.add(cvs);
+    }
+
+    byte[] get() {
+      StringBuilder sb = new StringBuilder();
+      String sep = "";
+      for (String cvs : visibilities) {
+        sb.append(sep);
+        sep = "|";
+        sb.append(cvs);
+      }
+
+      return sb.toString().getBytes();

Review comment:
       Should be `getBytes(UTF_8)`

##########
File path: core/src/main/java/org/apache/accumulo/core/iterators/InterruptibleIterator.java
##########
@@ -16,14 +16,22 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.accumulo.core.iteratorsImpl.system;
+package org.apache.accumulo.core.iterators;
 
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 
+/**
+ * Allows an iterator to be interrupted. Typically, once the interrupt flag is set the iterator will
+ * throw an {@link InterruptedException} if the interrupt is detected. Some iterators have been
+ * optimized to not always check the flag.
+ *
+ * One example of a system interrupt is when a Tablet is being closed. If a Tablet has an active

Review comment:
       Blank line in javadoc without a `<p>` isn't correct

##########
File path: core/src/main/java/org/apache/accumulo/core/iterators/SortedMapIterator.java
##########
@@ -36,28 +37,40 @@
  *
  * Note that this class is intended as an in-memory replacement for RFile$Reader, so its behavior
  * reflects the same assumptions; namely, that this iterator is not responsible for respecting the
- * columnFamilies passed into seek().
+ * columnFamilies passed into seek(). If you want a Map-backed Iterator that returns only sought
+ * CFs, construct a new ColumnFamilySkippingIterator(new SortedMapIterator(map)).
+ *
+ * @see org.apache.accumulo.core.iterators.ColumnFamilySkippingIterator
  */
-public class SortedMapIterator implements SortedKeyValueIterator<Key,Value> {
+public class SortedMapIterator implements InterruptibleIterator {
   private Iterator<Entry<Key,Value>> iter;
   private Entry<Key,Value> entry;
 
   private SortedMap<Key,Value> map;
   private Range range;
 
+  private AtomicBoolean interruptFlag;

Review comment:
       Atomic references are typically `final` whenever possible, to prevent bugs where atomicity is broken because the reference is changed. I see this can't be made final because there's a setter... but that seems like bad API design. Is there something we can do to fix this?

##########
File path: test/src/main/java/org/apache/accumulo/test/functional/IteratorMincClassCastBugIT.java
##########
@@ -0,0 +1,516 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.admin.NewTableConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.ArrayByteSequence;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.PartialKey;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.IteratorUtil;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+/**
+ * Tests iterator class hierarchy bug. See https://github.com/apache/accumulo/issues/2341
+ */
+public class IteratorMincClassCastBugIT extends AccumuloClusterHarness {
+
+  @Override
+  public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+    // this bug only shows up when not using native maps
+    cfg.setProperty(Property.TSERV_NATIVEMAP_ENABLED, "false");
+    cfg.setNumTservers(1);
+  }
+
+  @Override
+  protected int defaultTimeoutSeconds() {
+    return 60;
+  }
+
+  @Test
+  public void test() throws Exception {
+    try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) {
+
+      String tableName = getUniqueNames(1)[0];
+
+      NewTableConfiguration ntc = new NewTableConfiguration();
+      Map<String,Set<Text>> groups = new HashMap<>();
+      groups.put("g1", Set.of(new Text("~chunk")));
+      groups.put("g2", Set.of(new Text("refs")));
+      ntc.setLocalityGroups(groups);
+
+      IteratorSetting iteratorSetting = new IteratorSetting(20, ChunkCombiner.class);
+      ntc.attachIterator(iteratorSetting, EnumSet.of(IteratorUtil.IteratorScope.minc));
+
+      c.tableOperations().create(tableName, ntc);
+
+      int chunkSize = 64 * 1024;
+      ColumnVisibility visibility = new ColumnVisibility();
+
+      List<URL> files = new ArrayList<>();
+      files.add(getClass().getClassLoader().getResource("testfile1.md"));
+
+      try (BatchWriter bw = c.createBatchWriter(tableName, new BatchWriterConfig())) {
+        FileDataIngest fdi = new FileDataIngest(chunkSize, visibility);
+        for (URL filename : files) {
+          fdi.insertFileData(filename, bw);
+        }
+      }
+
+      c.tableOperations().flush(tableName, null, null, true);
+    }
+  }
+
+  /**
+   * Copied from 2.0 examples. Takes a list of files and archives them into Accumulo keyed on hashes
+   * of the files.
+   */
+  @SuppressFBWarnings(value = "WEAK_MESSAGE_DIGEST_MD5", justification = "For testing only")
+  public static class FileDataIngest {
+    public static final Text CHUNK_CF = new Text("~chunk");
+    public static final Text REFS_CF = new Text("refs");
+    public static final String REFS_ORIG_FILE = "name";
+    public static final String REFS_FILE_EXT = "filext";
+    public static final ByteSequence CHUNK_CF_BS =
+        new ArrayByteSequence(CHUNK_CF.getBytes(), 0, CHUNK_CF.getLength());
+    public static final ByteSequence REFS_CF_BS =
+        new ArrayByteSequence(REFS_CF.getBytes(), 0, REFS_CF.getLength());
+
+    int chunkSize;
+    byte[] chunkSizeBytes;
+    byte[] buf;
+    MessageDigest md5digest;
+    ColumnVisibility cv;
+
+    public FileDataIngest(int chunkSize, ColumnVisibility colvis) {
+      this.chunkSize = chunkSize;
+      chunkSizeBytes = intToBytes(chunkSize);
+      buf = new byte[chunkSize];
+      try {
+        md5digest = MessageDigest.getInstance("MD5");
+      } catch (NoSuchAlgorithmException e) {
+        throw new RuntimeException(e);
+      }
+      cv = colvis;
+    }
+
+    public String insertFileData(URL fileURL, BatchWriter bw)
+        throws MutationsRejectedException, IOException {
+      String filename = fileURL.getFile();
+      if (chunkSize == 0)
+        return "";
+      md5digest.reset();
+      String uid = hexString(md5digest.digest(filename.getBytes()));
+
+      // read through file once, calculating hashes
+      md5digest.reset();
+      InputStream fis = null;
+      int numRead = 0;
+      // try (var reader = new InputStreamReader(accumuloPropsLocation.openStream(), UTF_8)) {
+      try {
+        fis = new FileInputStream(filename);
+        numRead = fis.read(buf);
+        while (numRead >= 0) {
+          if (numRead > 0) {
+            md5digest.update(buf, 0, numRead);
+          }
+          numRead = fis.read(buf);
+        }
+      } finally {
+        if (fis != null) {
+          fis.close();
+        }
+      }

Review comment:
       Our test file is small enough that this can be reduced to two simple lines... one to read the file fully into a String, and another to compute the digest from the String's bytes. This would improve the comprehensibility of the test, since we don't actually need to chunk and buffer the tiny test file.

##########
File path: test/src/main/java/org/apache/accumulo/test/functional/IteratorMincClassCastBugIT.java
##########
@@ -0,0 +1,516 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.admin.NewTableConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.ArrayByteSequence;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.PartialKey;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.IteratorUtil;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+/**
+ * Tests iterator class hierarchy bug. See https://github.com/apache/accumulo/issues/2341
+ */
+public class IteratorMincClassCastBugIT extends AccumuloClusterHarness {
+
+  @Override
+  public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+    // this bug only shows up when not using native maps
+    cfg.setProperty(Property.TSERV_NATIVEMAP_ENABLED, "false");
+    cfg.setNumTservers(1);
+  }
+
+  @Override
+  protected int defaultTimeoutSeconds() {
+    return 60;
+  }
+
+  @Test
+  public void test() throws Exception {
+    try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) {
+
+      String tableName = getUniqueNames(1)[0];
+
+      NewTableConfiguration ntc = new NewTableConfiguration();
+      Map<String,Set<Text>> groups = new HashMap<>();
+      groups.put("g1", Set.of(new Text("~chunk")));
+      groups.put("g2", Set.of(new Text("refs")));
+      ntc.setLocalityGroups(groups);
+
+      IteratorSetting iteratorSetting = new IteratorSetting(20, ChunkCombiner.class);
+      ntc.attachIterator(iteratorSetting, EnumSet.of(IteratorUtil.IteratorScope.minc));
+
+      c.tableOperations().create(tableName, ntc);
+
+      int chunkSize = 64 * 1024;
+      ColumnVisibility visibility = new ColumnVisibility();
+
+      List<URL> files = new ArrayList<>();
+      files.add(getClass().getClassLoader().getResource("testfile1.md"));
+
+      try (BatchWriter bw = c.createBatchWriter(tableName, new BatchWriterConfig())) {
+        FileDataIngest fdi = new FileDataIngest(chunkSize, visibility);
+        for (URL filename : files) {
+          fdi.insertFileData(filename, bw);
+        }
+      }
+
+      c.tableOperations().flush(tableName, null, null, true);
+    }
+  }
+
+  /**
+   * Copied from 2.0 examples. Takes a list of files and archives them into Accumulo keyed on hashes
+   * of the files.
+   */
+  @SuppressFBWarnings(value = "WEAK_MESSAGE_DIGEST_MD5", justification = "For testing only")
+  public static class FileDataIngest {
+    public static final Text CHUNK_CF = new Text("~chunk");
+    public static final Text REFS_CF = new Text("refs");
+    public static final String REFS_ORIG_FILE = "name";
+    public static final String REFS_FILE_EXT = "filext";
+    public static final ByteSequence CHUNK_CF_BS =
+        new ArrayByteSequence(CHUNK_CF.getBytes(), 0, CHUNK_CF.getLength());
+    public static final ByteSequence REFS_CF_BS =
+        new ArrayByteSequence(REFS_CF.getBytes(), 0, REFS_CF.getLength());
+
+    int chunkSize;
+    byte[] chunkSizeBytes;
+    byte[] buf;
+    MessageDigest md5digest;
+    ColumnVisibility cv;
+
+    public FileDataIngest(int chunkSize, ColumnVisibility colvis) {
+      this.chunkSize = chunkSize;
+      chunkSizeBytes = intToBytes(chunkSize);
+      buf = new byte[chunkSize];
+      try {
+        md5digest = MessageDigest.getInstance("MD5");
+      } catch (NoSuchAlgorithmException e) {
+        throw new RuntimeException(e);
+      }
+      cv = colvis;
+    }
+
+    public String insertFileData(URL fileURL, BatchWriter bw)
+        throws MutationsRejectedException, IOException {
+      String filename = fileURL.getFile();
+      if (chunkSize == 0)
+        return "";
+      md5digest.reset();
+      String uid = hexString(md5digest.digest(filename.getBytes()));
+
+      // read through file once, calculating hashes
+      md5digest.reset();
+      InputStream fis = null;
+      int numRead = 0;
+      // try (var reader = new InputStreamReader(accumuloPropsLocation.openStream(), UTF_8)) {
+      try {
+        fis = new FileInputStream(filename);
+        numRead = fis.read(buf);
+        while (numRead >= 0) {
+          if (numRead > 0) {
+            md5digest.update(buf, 0, numRead);
+          }
+          numRead = fis.read(buf);
+        }
+      } finally {
+        if (fis != null) {
+          fis.close();
+        }
+      }
+
+      String hash = hexString(md5digest.digest());
+      Text row = new Text(hash);
+
+      // write info to accumulo
+      Mutation m = new Mutation(row);
+      m.put(REFS_CF, buildNullSepText(uid, REFS_ORIG_FILE), cv, new Value(filename.getBytes()));
+      String fext = getExt(filename);
+      if (fext != null)
+        m.put(REFS_CF, buildNullSepText(uid, REFS_FILE_EXT), cv, new Value(fext.getBytes()));
+      bw.addMutation(m);
+
+      // read through file again, writing chunks to accumulo
+      int chunkCount = 0;
+      try {
+        fis = new FileInputStream(filename);
+        numRead = fis.read(buf);
+        while (numRead >= 0) {
+          while (numRead < buf.length) {
+            int moreRead = fis.read(buf, numRead, buf.length - numRead);
+            if (moreRead > 0)
+              numRead += moreRead;
+            else if (moreRead < 0)
+              break;
+          }
+          m = new Mutation(row);
+          Text chunkCQ = new Text(chunkSizeBytes);
+          chunkCQ.append(intToBytes(chunkCount), 0, 4);
+          m.put(CHUNK_CF, chunkCQ, cv, new Value(buf, 0, numRead));
+          bw.addMutation(m);
+          if (chunkCount == Integer.MAX_VALUE)
+            throw new RuntimeException(
+                "too many chunks for file " + filename + ", try raising chunk size");
+          chunkCount++;
+          numRead = fis.read(buf);
+        }
+      } finally {
+        if (fis != null) {
+          fis.close();
+        }
+      }
+      m = new Mutation(row);
+      Text chunkCQ = new Text(chunkSizeBytes);
+      chunkCQ.append(intToBytes(chunkCount), 0, 4);
+      m.put(new Text(CHUNK_CF), chunkCQ, cv, new Value(new byte[0]));
+      bw.addMutation(m);
+      return hash;
+    }
+
+    public static byte[] intToBytes(int l) {
+      byte[] b = new byte[4];
+      b[0] = (byte) (l >>> 24);
+      b[1] = (byte) (l >>> 16);
+      b[2] = (byte) (l >>> 8);
+      b[3] = (byte) (l >>> 0);
+      return b;
+    }
+
+    private static String getExt(String filename) {
+      if (filename.indexOf(".") == -1)
+        return null;
+      return filename.substring(filename.lastIndexOf(".") + 1);
+    }
+
+    public String hexString(byte[] bytes) {
+      StringBuilder sb = new StringBuilder();
+      for (byte b : bytes) {
+        sb.append(String.format("%02x", b));
+      }
+      return sb.toString();
+    }
+  }
+
+  /**
+   * This iterator dedupes chunks and sets their visibilities to the combined visibility of the refs
+   * columns. For example, it would combine
+   *
+   * <pre>
+   *    row1 refs uid1\0a A&amp;B V0
+   *    row1 refs uid2\0b C&amp;D V0
+   *    row1 ~chunk 0 A&amp;B V1
+   *    row1 ~chunk 0 C&amp;D V1
+   *    row1 ~chunk 0 E&amp;F V1
+   *    row1 ~chunk 0 G&amp;H V1
+   * </pre>
+   *
+   * into the following
+   *
+   * <pre>
+   *    row1 refs uid1\0a A&amp;B V0
+   *    row1 refs uid2\0b C&amp;D V0
+   *    row1 ~chunk 0 (A&amp;B)|(C&amp;D) V1
+   * </pre>
+   *
+   * {@link VisibilityCombiner} is used to combie the visibilities.
+   */
+  public static class ChunkCombiner implements SortedKeyValueIterator<Key,Value> {
+    private static final Logger log = LoggerFactory.getLogger(ChunkCombiner.class);
+
+    private SortedKeyValueIterator<Key,Value> source;
+    private SortedKeyValueIterator<Key,Value> refsSource;
+    private static final Collection<ByteSequence> refsColf =
+        Collections.singleton(FileDataIngest.REFS_CF_BS);
+    private Map<Text,byte[]> lastRowVC = Collections.emptyMap();
+
+    private Key topKey = null;
+    private Value topValue = null;
+
+    public ChunkCombiner() {}
+
+    @Override
+    public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options,
+        IteratorEnvironment env) throws IOException {
+      this.source = source;
+      this.refsSource = source.deepCopy(env);
+      log.info("DUDE call init. deepCopy on {}", source.getClass());
+    }
+
+    @Override
+    public boolean hasTop() {
+      return topKey != null;
+    }
+
+    @Override
+    public void next() throws IOException {
+      findTop();
+    }
+
+    @Override
+    public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive)
+        throws IOException {
+      source.seek(range, columnFamilies, inclusive);
+      findTop();
+    }
+
+    private void findTop() throws IOException {
+      do {
+        topKey = null;
+        topValue = null;
+      } while (source.hasTop() && _findTop() == null);
+    }
+
+    private byte[] _findTop() throws IOException {
+      long maxTS;
+
+      topKey = new Key(source.getTopKey());
+      topValue = new Value(source.getTopValue());
+      source.next();
+
+      if (!topKey.getColumnFamilyData().equals(FileDataIngest.CHUNK_CF_BS))
+        return topKey.getColumnVisibility().getBytes();
+
+      maxTS = topKey.getTimestamp();
+
+      while (source.hasTop() && source.getTopKey().equals(topKey, PartialKey.ROW_COLFAM_COLQUAL)) {
+        if (source.getTopKey().getTimestamp() > maxTS)
+          maxTS = source.getTopKey().getTimestamp();
+
+        if (!topValue.equals(source.getTopValue()))
+          throw new RuntimeException("values not equals " + topKey + " " + source.getTopKey()
+              + " : " + diffInfo(topValue, source.getTopValue()));
+
+        source.next();
+      }
+
+      byte[] vis = getVisFromRefs();
+      if (vis != null) {
+        topKey = new Key(topKey.getRowData().toArray(), topKey.getColumnFamilyData().toArray(),
+            topKey.getColumnQualifierData().toArray(), vis, maxTS);
+      }
+      return vis;
+    }
+
+    private byte[] getVisFromRefs() throws IOException {
+      Text row = topKey.getRow();
+      if (lastRowVC.containsKey(row))
+        return lastRowVC.get(row);
+      Range range = new Range(row);
+      refsSource.seek(range, refsColf, true);
+      VisibilityCombiner vc = null;
+      while (refsSource.hasTop()) {
+        if (vc == null)
+          vc = new VisibilityCombiner();
+        vc.add(refsSource.getTopKey().getColumnVisibilityData());
+        refsSource.next();
+      }
+      if (vc == null) {
+        lastRowVC = Collections.singletonMap(row, null);
+        return null;
+      }
+      lastRowVC = Collections.singletonMap(row, vc.get());
+      return vc.get();
+    }
+
+    private String diffInfo(Value v1, Value v2) {
+      if (v1.getSize() != v2.getSize()) {
+        return "val len not equal " + v1.getSize() + "!=" + v2.getSize();
+      }
+
+      byte[] vb1 = v1.get();
+      byte[] vb2 = v2.get();
+
+      for (int i = 0; i < vb1.length; i++) {
+        if (vb1[i] != vb2[i]) {
+          return String.format("first diff at offset %,d 0x%02x != 0x%02x", i, 0xff & vb1[i],
+              0xff & vb2[i]);
+        }
+      }
+
+      return null;
+    }
+
+    @Override
+    public Key getTopKey() {
+      return topKey;
+    }
+
+    @Override
+    public Value getTopValue() {
+      return topValue;
+    }
+
+    @Override
+    public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
+      ChunkCombiner cc = new ChunkCombiner();
+      try {
+        cc.init(source.deepCopy(env), null, env);
+      } catch (IOException e) {
+        throw new IllegalArgumentException(e);
+      }
+      return cc;
+    }
+  }
+
+  /**
+   * Copied from 2.0 Examples. A utility for merging visibilities into the form
+   * {@code (VIS1)|(VIS2)|...|(VISN)}. Used by the ChunkCombiner.
+   */
+  public static class VisibilityCombiner {
+
+    private TreeSet<String> visibilities = new TreeSet<>();
+
+    void add(ByteSequence cv) {
+      if (cv.length() == 0)
+        return;
+
+      int depth = 0;
+      int offset = 0;
+
+      for (int i = 0; i < cv.length(); i++) {
+        switch (cv.byteAt(i)) {
+          case '(':
+            depth++;
+            break;
+          case ')':
+            depth--;
+            if (depth < 0)
+              throw new IllegalArgumentException("Invalid vis " + cv);
+            break;
+          case '|':
+            if (depth == 0) {
+              insert(cv.subSequence(offset, i));
+              offset = i + 1;
+            }
+
+            break;
+        }
+      }
+
+      insert(cv.subSequence(offset, cv.length()));
+
+      if (depth != 0)
+        throw new IllegalArgumentException("Invalid vis " + cv);
+
+    }
+
+    private void insert(ByteSequence cv) {
+      for (int i = 0; i < cv.length(); i++) {
+
+      }
+
+      String cvs = cv.toString();
+
+      if (cvs.charAt(0) != '(')
+        cvs = "(" + cvs + ")";
+      else {
+        int depth = 0;
+        int depthZeroCloses = 0;
+        for (int i = 0; i < cv.length(); i++) {
+          switch (cv.byteAt(i)) {
+            case '(':
+              depth++;
+              break;
+            case ')':
+              depth--;
+              if (depth == 0)
+                depthZeroCloses++;
+              break;
+          }
+        }
+
+        if (depthZeroCloses > 1)
+          cvs = "(" + cvs + ")";
+      }
+
+      visibilities.add(cvs);
+    }
+
+    byte[] get() {
+      StringBuilder sb = new StringBuilder();
+      String sep = "";
+      for (String cvs : visibilities) {
+        sb.append(sep);
+        sep = "|";
+        sb.append(cvs);
+      }
+
+      return sb.toString().getBytes();
+    }
+  }
+
+  public static final byte[] nullbyte = new byte[] {0};
+
+  /**
+   * Join some number of strings using a null byte separator into a text object.
+   *
+   * @param s
+   *          strings
+   * @return a text object containing the strings separated by null bytes
+   */

Review comment:
       This could probably just be a one-line inline comment. We're not publishing javadocs for ITs. This method can probably also be private... as could some others added in this file for testing.

##########
File path: core/src/main/java/org/apache/accumulo/core/iterators/SortedMapIterator.java
##########
@@ -95,6 +111,9 @@ public void next() throws IOException {
   public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive)
       throws IOException {
 
+    if (interruptFlag != null && interruptFlag.get())

Review comment:
       `interruptFlag` should never be null. We can require non-null anywhere we accept it.

##########
File path: test/src/main/java/org/apache/accumulo/test/functional/IteratorMincClassCastBugIT.java
##########
@@ -0,0 +1,516 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.admin.NewTableConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.ArrayByteSequence;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.PartialKey;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.IteratorUtil;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+/**
+ * Tests iterator class hierarchy bug. See https://github.com/apache/accumulo/issues/2341
+ */
+public class IteratorMincClassCastBugIT extends AccumuloClusterHarness {
+
+  @Override
+  public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+    // this bug only shows up when not using native maps
+    cfg.setProperty(Property.TSERV_NATIVEMAP_ENABLED, "false");
+    cfg.setNumTservers(1);
+  }
+
+  @Override
+  protected int defaultTimeoutSeconds() {
+    return 60;
+  }
+
+  @Test
+  public void test() throws Exception {
+    try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) {
+
+      String tableName = getUniqueNames(1)[0];
+
+      NewTableConfiguration ntc = new NewTableConfiguration();
+      Map<String,Set<Text>> groups = new HashMap<>();
+      groups.put("g1", Set.of(new Text("~chunk")));
+      groups.put("g2", Set.of(new Text("refs")));
+      ntc.setLocalityGroups(groups);
+
+      IteratorSetting iteratorSetting = new IteratorSetting(20, ChunkCombiner.class);
+      ntc.attachIterator(iteratorSetting, EnumSet.of(IteratorUtil.IteratorScope.minc));
+
+      c.tableOperations().create(tableName, ntc);
+
+      int chunkSize = 64 * 1024;
+      ColumnVisibility visibility = new ColumnVisibility();
+
+      List<URL> files = new ArrayList<>();
+      files.add(getClass().getClassLoader().getResource("testfile1.md"));
+
+      try (BatchWriter bw = c.createBatchWriter(tableName, new BatchWriterConfig())) {
+        FileDataIngest fdi = new FileDataIngest(chunkSize, visibility);
+        for (URL filename : files) {
+          fdi.insertFileData(filename, bw);
+        }
+      }
+
+      c.tableOperations().flush(tableName, null, null, true);
+    }
+  }
+
+  /**
+   * Copied from 2.0 examples. Takes a list of files and archives them into Accumulo keyed on hashes
+   * of the files.
+   */
+  @SuppressFBWarnings(value = "WEAK_MESSAGE_DIGEST_MD5", justification = "For testing only")
+  public static class FileDataIngest {
+    public static final Text CHUNK_CF = new Text("~chunk");
+    public static final Text REFS_CF = new Text("refs");
+    public static final String REFS_ORIG_FILE = "name";
+    public static final String REFS_FILE_EXT = "filext";
+    public static final ByteSequence CHUNK_CF_BS =
+        new ArrayByteSequence(CHUNK_CF.getBytes(), 0, CHUNK_CF.getLength());
+    public static final ByteSequence REFS_CF_BS =
+        new ArrayByteSequence(REFS_CF.getBytes(), 0, REFS_CF.getLength());
+
+    int chunkSize;
+    byte[] chunkSizeBytes;
+    byte[] buf;
+    MessageDigest md5digest;
+    ColumnVisibility cv;
+
+    public FileDataIngest(int chunkSize, ColumnVisibility colvis) {
+      this.chunkSize = chunkSize;
+      chunkSizeBytes = intToBytes(chunkSize);
+      buf = new byte[chunkSize];
+      try {
+        md5digest = MessageDigest.getInstance("MD5");
+      } catch (NoSuchAlgorithmException e) {
+        throw new RuntimeException(e);
+      }
+      cv = colvis;
+    }
+
+    public String insertFileData(URL fileURL, BatchWriter bw)
+        throws MutationsRejectedException, IOException {
+      String filename = fileURL.getFile();
+      if (chunkSize == 0)
+        return "";
+      md5digest.reset();
+      String uid = hexString(md5digest.digest(filename.getBytes()));
+
+      // read through file once, calculating hashes
+      md5digest.reset();
+      InputStream fis = null;
+      int numRead = 0;
+      // try (var reader = new InputStreamReader(accumuloPropsLocation.openStream(), UTF_8)) {
+      try {
+        fis = new FileInputStream(filename);
+        numRead = fis.read(buf);
+        while (numRead >= 0) {
+          if (numRead > 0) {
+            md5digest.update(buf, 0, numRead);
+          }
+          numRead = fis.read(buf);
+        }
+      } finally {
+        if (fis != null) {
+          fis.close();
+        }
+      }
+
+      String hash = hexString(md5digest.digest());
+      Text row = new Text(hash);
+
+      // write info to accumulo
+      Mutation m = new Mutation(row);
+      m.put(REFS_CF, buildNullSepText(uid, REFS_ORIG_FILE), cv, new Value(filename.getBytes()));
+      String fext = getExt(filename);
+      if (fext != null)
+        m.put(REFS_CF, buildNullSepText(uid, REFS_FILE_EXT), cv, new Value(fext.getBytes()));
+      bw.addMutation(m);
+
+      // read through file again, writing chunks to accumulo
+      int chunkCount = 0;
+      try {
+        fis = new FileInputStream(filename);
+        numRead = fis.read(buf);
+        while (numRead >= 0) {
+          while (numRead < buf.length) {
+            int moreRead = fis.read(buf, numRead, buf.length - numRead);
+            if (moreRead > 0)
+              numRead += moreRead;
+            else if (moreRead < 0)
+              break;
+          }
+          m = new Mutation(row);
+          Text chunkCQ = new Text(chunkSizeBytes);
+          chunkCQ.append(intToBytes(chunkCount), 0, 4);
+          m.put(CHUNK_CF, chunkCQ, cv, new Value(buf, 0, numRead));
+          bw.addMutation(m);
+          if (chunkCount == Integer.MAX_VALUE)
+            throw new RuntimeException(
+                "too many chunks for file " + filename + ", try raising chunk size");
+          chunkCount++;
+          numRead = fis.read(buf);
+        }
+      } finally {
+        if (fis != null) {
+          fis.close();
+        }
+      }
+      m = new Mutation(row);
+      Text chunkCQ = new Text(chunkSizeBytes);
+      chunkCQ.append(intToBytes(chunkCount), 0, 4);
+      m.put(new Text(CHUNK_CF), chunkCQ, cv, new Value(new byte[0]));
+      bw.addMutation(m);
+      return hash;
+    }
+
+    public static byte[] intToBytes(int l) {
+      byte[] b = new byte[4];
+      b[0] = (byte) (l >>> 24);
+      b[1] = (byte) (l >>> 16);
+      b[2] = (byte) (l >>> 8);
+      b[3] = (byte) (l >>> 0);
+      return b;
+    }
+
+    private static String getExt(String filename) {
+      if (filename.indexOf(".") == -1)
+        return null;
+      return filename.substring(filename.lastIndexOf(".") + 1);
+    }
+
+    public String hexString(byte[] bytes) {
+      StringBuilder sb = new StringBuilder();
+      for (byte b : bytes) {
+        sb.append(String.format("%02x", b));
+      }
+      return sb.toString();
+    }
+  }
+
+  /**
+   * This iterator dedupes chunks and sets their visibilities to the combined visibility of the refs
+   * columns. For example, it would combine
+   *
+   * <pre>
+   *    row1 refs uid1\0a A&amp;B V0
+   *    row1 refs uid2\0b C&amp;D V0
+   *    row1 ~chunk 0 A&amp;B V1
+   *    row1 ~chunk 0 C&amp;D V1
+   *    row1 ~chunk 0 E&amp;F V1
+   *    row1 ~chunk 0 G&amp;H V1
+   * </pre>
+   *
+   * into the following
+   *
+   * <pre>
+   *    row1 refs uid1\0a A&amp;B V0
+   *    row1 refs uid2\0b C&amp;D V0
+   *    row1 ~chunk 0 (A&amp;B)|(C&amp;D) V1
+   * </pre>
+   *
+   * {@link VisibilityCombiner} is used to combie the visibilities.
+   */
+  public static class ChunkCombiner implements SortedKeyValueIterator<Key,Value> {
+    private static final Logger log = LoggerFactory.getLogger(ChunkCombiner.class);
+
+    private SortedKeyValueIterator<Key,Value> source;
+    private SortedKeyValueIterator<Key,Value> refsSource;
+    private static final Collection<ByteSequence> refsColf =
+        Collections.singleton(FileDataIngest.REFS_CF_BS);
+    private Map<Text,byte[]> lastRowVC = Collections.emptyMap();
+
+    private Key topKey = null;
+    private Value topValue = null;
+
+    public ChunkCombiner() {}
+
+    @Override
+    public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options,
+        IteratorEnvironment env) throws IOException {
+      this.source = source;
+      this.refsSource = source.deepCopy(env);
+      log.info("DUDE call init. deepCopy on {}", source.getClass());
+    }
+
+    @Override
+    public boolean hasTop() {
+      return topKey != null;
+    }
+
+    @Override
+    public void next() throws IOException {
+      findTop();
+    }
+
+    @Override
+    public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive)
+        throws IOException {
+      source.seek(range, columnFamilies, inclusive);
+      findTop();
+    }
+
+    private void findTop() throws IOException {
+      do {
+        topKey = null;
+        topValue = null;
+      } while (source.hasTop() && _findTop() == null);
+    }
+
+    private byte[] _findTop() throws IOException {
+      long maxTS;
+
+      topKey = new Key(source.getTopKey());
+      topValue = new Value(source.getTopValue());
+      source.next();
+
+      if (!topKey.getColumnFamilyData().equals(FileDataIngest.CHUNK_CF_BS))
+        return topKey.getColumnVisibility().getBytes();
+
+      maxTS = topKey.getTimestamp();
+
+      while (source.hasTop() && source.getTopKey().equals(topKey, PartialKey.ROW_COLFAM_COLQUAL)) {
+        if (source.getTopKey().getTimestamp() > maxTS)
+          maxTS = source.getTopKey().getTimestamp();
+
+        if (!topValue.equals(source.getTopValue()))
+          throw new RuntimeException("values not equals " + topKey + " " + source.getTopKey()
+              + " : " + diffInfo(topValue, source.getTopValue()));
+
+        source.next();
+      }
+
+      byte[] vis = getVisFromRefs();
+      if (vis != null) {
+        topKey = new Key(topKey.getRowData().toArray(), topKey.getColumnFamilyData().toArray(),
+            topKey.getColumnQualifierData().toArray(), vis, maxTS);
+      }
+      return vis;
+    }
+
+    private byte[] getVisFromRefs() throws IOException {
+      Text row = topKey.getRow();
+      if (lastRowVC.containsKey(row))
+        return lastRowVC.get(row);
+      Range range = new Range(row);
+      refsSource.seek(range, refsColf, true);
+      VisibilityCombiner vc = null;
+      while (refsSource.hasTop()) {
+        if (vc == null)
+          vc = new VisibilityCombiner();
+        vc.add(refsSource.getTopKey().getColumnVisibilityData());
+        refsSource.next();
+      }
+      if (vc == null) {
+        lastRowVC = Collections.singletonMap(row, null);
+        return null;
+      }
+      lastRowVC = Collections.singletonMap(row, vc.get());
+      return vc.get();
+    }
+
+    private String diffInfo(Value v1, Value v2) {
+      if (v1.getSize() != v2.getSize()) {
+        return "val len not equal " + v1.getSize() + "!=" + v2.getSize();
+      }
+
+      byte[] vb1 = v1.get();
+      byte[] vb2 = v2.get();
+
+      for (int i = 0; i < vb1.length; i++) {
+        if (vb1[i] != vb2[i]) {
+          return String.format("first diff at offset %,d 0x%02x != 0x%02x", i, 0xff & vb1[i],
+              0xff & vb2[i]);
+        }
+      }
+
+      return null;
+    }
+
+    @Override
+    public Key getTopKey() {
+      return topKey;
+    }
+
+    @Override
+    public Value getTopValue() {
+      return topValue;
+    }
+
+    @Override
+    public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
+      ChunkCombiner cc = new ChunkCombiner();
+      try {
+        cc.init(source.deepCopy(env), null, env);
+      } catch (IOException e) {
+        throw new IllegalArgumentException(e);
+      }
+      return cc;
+    }
+  }
+
+  /**
+   * Copied from 2.0 Examples. A utility for merging visibilities into the form
+   * {@code (VIS1)|(VIS2)|...|(VISN)}. Used by the ChunkCombiner.
+   */
+  public static class VisibilityCombiner {
+
+    private TreeSet<String> visibilities = new TreeSet<>();
+
+    void add(ByteSequence cv) {
+      if (cv.length() == 0)
+        return;
+
+      int depth = 0;
+      int offset = 0;
+
+      for (int i = 0; i < cv.length(); i++) {
+        switch (cv.byteAt(i)) {
+          case '(':
+            depth++;
+            break;
+          case ')':
+            depth--;
+            if (depth < 0)
+              throw new IllegalArgumentException("Invalid vis " + cv);
+            break;
+          case '|':
+            if (depth == 0) {
+              insert(cv.subSequence(offset, i));
+              offset = i + 1;
+            }
+
+            break;
+        }
+      }
+
+      insert(cv.subSequence(offset, cv.length()));
+
+      if (depth != 0)
+        throw new IllegalArgumentException("Invalid vis " + cv);
+
+    }
+
+    private void insert(ByteSequence cv) {
+      for (int i = 0; i < cv.length(); i++) {
+
+      }

Review comment:
       What is this loop doing?

##########
File path: core/src/main/java/org/apache/accumulo/core/iterators/ColumnFamilySkippingIterator.java
##########
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.accumulo.core.iteratorsImpl.system;
+package org.apache.accumulo.core.iterators;

Review comment:
       Moving the interface back to public API seems like the obvious fix, but are we expecting users to actually use this interface? I haven't followed all the details, but my understanding was that this was a feature intended solely for internal use with some of our own system iterators. But, maybe I'm wrong? Is this something that is useful to user iterators and safe for them to use?

##########
File path: test/src/main/java/org/apache/accumulo/test/functional/IteratorMincClassCastBugIT.java
##########
@@ -0,0 +1,516 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.admin.NewTableConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.ArrayByteSequence;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.PartialKey;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.IteratorUtil;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+/**
+ * Tests iterator class hierarchy bug. See https://github.com/apache/accumulo/issues/2341
+ */
+public class IteratorMincClassCastBugIT extends AccumuloClusterHarness {
+
+  @Override
+  public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+    // this bug only shows up when not using native maps
+    cfg.setProperty(Property.TSERV_NATIVEMAP_ENABLED, "false");
+    cfg.setNumTservers(1);
+  }
+
+  @Override
+  protected int defaultTimeoutSeconds() {
+    return 60;
+  }
+
+  @Test
+  public void test() throws Exception {
+    try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) {
+
+      String tableName = getUniqueNames(1)[0];
+
+      NewTableConfiguration ntc = new NewTableConfiguration();
+      Map<String,Set<Text>> groups = new HashMap<>();
+      groups.put("g1", Set.of(new Text("~chunk")));
+      groups.put("g2", Set.of(new Text("refs")));
+      ntc.setLocalityGroups(groups);
+
+      IteratorSetting iteratorSetting = new IteratorSetting(20, ChunkCombiner.class);
+      ntc.attachIterator(iteratorSetting, EnumSet.of(IteratorUtil.IteratorScope.minc));
+
+      c.tableOperations().create(tableName, ntc);
+
+      int chunkSize = 64 * 1024;
+      ColumnVisibility visibility = new ColumnVisibility();
+
+      List<URL> files = new ArrayList<>();
+      files.add(getClass().getClassLoader().getResource("testfile1.md"));
+
+      try (BatchWriter bw = c.createBatchWriter(tableName, new BatchWriterConfig())) {
+        FileDataIngest fdi = new FileDataIngest(chunkSize, visibility);
+        for (URL filename : files) {
+          fdi.insertFileData(filename, bw);
+        }
+      }
+
+      c.tableOperations().flush(tableName, null, null, true);
+    }
+  }
+
+  /**
+   * Copied from 2.0 examples. Takes a list of files and archives them into Accumulo keyed on hashes
+   * of the files.
+   */
+  @SuppressFBWarnings(value = "WEAK_MESSAGE_DIGEST_MD5", justification = "For testing only")
+  public static class FileDataIngest {
+    public static final Text CHUNK_CF = new Text("~chunk");
+    public static final Text REFS_CF = new Text("refs");
+    public static final String REFS_ORIG_FILE = "name";
+    public static final String REFS_FILE_EXT = "filext";
+    public static final ByteSequence CHUNK_CF_BS =
+        new ArrayByteSequence(CHUNK_CF.getBytes(), 0, CHUNK_CF.getLength());
+    public static final ByteSequence REFS_CF_BS =
+        new ArrayByteSequence(REFS_CF.getBytes(), 0, REFS_CF.getLength());
+
+    int chunkSize;
+    byte[] chunkSizeBytes;
+    byte[] buf;
+    MessageDigest md5digest;
+    ColumnVisibility cv;
+
+    public FileDataIngest(int chunkSize, ColumnVisibility colvis) {
+      this.chunkSize = chunkSize;
+      chunkSizeBytes = intToBytes(chunkSize);
+      buf = new byte[chunkSize];
+      try {
+        md5digest = MessageDigest.getInstance("MD5");
+      } catch (NoSuchAlgorithmException e) {
+        throw new RuntimeException(e);
+      }
+      cv = colvis;
+    }
+
+    public String insertFileData(URL fileURL, BatchWriter bw)
+        throws MutationsRejectedException, IOException {
+      String filename = fileURL.getFile();
+      if (chunkSize == 0)
+        return "";
+      md5digest.reset();
+      String uid = hexString(md5digest.digest(filename.getBytes()));
+
+      // read through file once, calculating hashes
+      md5digest.reset();
+      InputStream fis = null;
+      int numRead = 0;
+      // try (var reader = new InputStreamReader(accumuloPropsLocation.openStream(), UTF_8)) {
+      try {
+        fis = new FileInputStream(filename);
+        numRead = fis.read(buf);
+        while (numRead >= 0) {
+          if (numRead > 0) {
+            md5digest.update(buf, 0, numRead);
+          }
+          numRead = fis.read(buf);
+        }
+      } finally {
+        if (fis != null) {
+          fis.close();
+        }
+      }
+
+      String hash = hexString(md5digest.digest());
+      Text row = new Text(hash);
+
+      // write info to accumulo
+      Mutation m = new Mutation(row);
+      m.put(REFS_CF, buildNullSepText(uid, REFS_ORIG_FILE), cv, new Value(filename.getBytes()));
+      String fext = getExt(filename);
+      if (fext != null)
+        m.put(REFS_CF, buildNullSepText(uid, REFS_FILE_EXT), cv, new Value(fext.getBytes()));
+      bw.addMutation(m);
+
+      // read through file again, writing chunks to accumulo
+      int chunkCount = 0;
+      try {
+        fis = new FileInputStream(filename);
+        numRead = fis.read(buf);
+        while (numRead >= 0) {
+          while (numRead < buf.length) {
+            int moreRead = fis.read(buf, numRead, buf.length - numRead);
+            if (moreRead > 0)
+              numRead += moreRead;
+            else if (moreRead < 0)
+              break;
+          }
+          m = new Mutation(row);
+          Text chunkCQ = new Text(chunkSizeBytes);
+          chunkCQ.append(intToBytes(chunkCount), 0, 4);
+          m.put(CHUNK_CF, chunkCQ, cv, new Value(buf, 0, numRead));
+          bw.addMutation(m);
+          if (chunkCount == Integer.MAX_VALUE)
+            throw new RuntimeException(
+                "too many chunks for file " + filename + ", try raising chunk size");
+          chunkCount++;
+          numRead = fis.read(buf);
+        }
+      } finally {
+        if (fis != null) {
+          fis.close();
+        }
+      }
+      m = new Mutation(row);
+      Text chunkCQ = new Text(chunkSizeBytes);
+      chunkCQ.append(intToBytes(chunkCount), 0, 4);
+      m.put(new Text(CHUNK_CF), chunkCQ, cv, new Value(new byte[0]));
+      bw.addMutation(m);
+      return hash;
+    }
+
+    public static byte[] intToBytes(int l) {
+      byte[] b = new byte[4];
+      b[0] = (byte) (l >>> 24);
+      b[1] = (byte) (l >>> 16);
+      b[2] = (byte) (l >>> 8);
+      b[3] = (byte) (l >>> 0);
+      return b;
+    }
+
+    private static String getExt(String filename) {
+      if (filename.indexOf(".") == -1)
+        return null;
+      return filename.substring(filename.lastIndexOf(".") + 1);
+    }
+
+    public String hexString(byte[] bytes) {
+      StringBuilder sb = new StringBuilder();
+      for (byte b : bytes) {
+        sb.append(String.format("%02x", b));
+      }
+      return sb.toString();
+    }
+  }
+
+  /**
+   * This iterator dedupes chunks and sets their visibilities to the combined visibility of the refs
+   * columns. For example, it would combine
+   *
+   * <pre>
+   *    row1 refs uid1\0a A&amp;B V0
+   *    row1 refs uid2\0b C&amp;D V0
+   *    row1 ~chunk 0 A&amp;B V1
+   *    row1 ~chunk 0 C&amp;D V1
+   *    row1 ~chunk 0 E&amp;F V1
+   *    row1 ~chunk 0 G&amp;H V1
+   * </pre>
+   *
+   * into the following
+   *
+   * <pre>
+   *    row1 refs uid1\0a A&amp;B V0
+   *    row1 refs uid2\0b C&amp;D V0
+   *    row1 ~chunk 0 (A&amp;B)|(C&amp;D) V1
+   * </pre>
+   *
+   * {@link VisibilityCombiner} is used to combie the visibilities.
+   */
+  public static class ChunkCombiner implements SortedKeyValueIterator<Key,Value> {
+    private static final Logger log = LoggerFactory.getLogger(ChunkCombiner.class);
+
+    private SortedKeyValueIterator<Key,Value> source;
+    private SortedKeyValueIterator<Key,Value> refsSource;
+    private static final Collection<ByteSequence> refsColf =
+        Collections.singleton(FileDataIngest.REFS_CF_BS);
+    private Map<Text,byte[]> lastRowVC = Collections.emptyMap();
+
+    private Key topKey = null;
+    private Value topValue = null;
+
+    public ChunkCombiner() {}
+
+    @Override
+    public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options,
+        IteratorEnvironment env) throws IOException {
+      this.source = source;
+      this.refsSource = source.deepCopy(env);
+      log.info("DUDE call init. deepCopy on {}", source.getClass());
+    }
+
+    @Override
+    public boolean hasTop() {
+      return topKey != null;
+    }
+
+    @Override
+    public void next() throws IOException {
+      findTop();
+    }
+
+    @Override
+    public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive)
+        throws IOException {
+      source.seek(range, columnFamilies, inclusive);
+      findTop();
+    }
+
+    private void findTop() throws IOException {
+      do {
+        topKey = null;
+        topValue = null;
+      } while (source.hasTop() && _findTop() == null);
+    }
+
+    private byte[] _findTop() throws IOException {
+      long maxTS;
+
+      topKey = new Key(source.getTopKey());
+      topValue = new Value(source.getTopValue());
+      source.next();
+
+      if (!topKey.getColumnFamilyData().equals(FileDataIngest.CHUNK_CF_BS))
+        return topKey.getColumnVisibility().getBytes();
+
+      maxTS = topKey.getTimestamp();
+
+      while (source.hasTop() && source.getTopKey().equals(topKey, PartialKey.ROW_COLFAM_COLQUAL)) {
+        if (source.getTopKey().getTimestamp() > maxTS)
+          maxTS = source.getTopKey().getTimestamp();
+
+        if (!topValue.equals(source.getTopValue()))
+          throw new RuntimeException("values not equals " + topKey + " " + source.getTopKey()
+              + " : " + diffInfo(topValue, source.getTopValue()));
+
+        source.next();
+      }
+
+      byte[] vis = getVisFromRefs();
+      if (vis != null) {
+        topKey = new Key(topKey.getRowData().toArray(), topKey.getColumnFamilyData().toArray(),
+            topKey.getColumnQualifierData().toArray(), vis, maxTS);
+      }
+      return vis;
+    }
+
+    private byte[] getVisFromRefs() throws IOException {
+      Text row = topKey.getRow();
+      if (lastRowVC.containsKey(row))
+        return lastRowVC.get(row);
+      Range range = new Range(row);
+      refsSource.seek(range, refsColf, true);
+      VisibilityCombiner vc = null;
+      while (refsSource.hasTop()) {
+        if (vc == null)
+          vc = new VisibilityCombiner();
+        vc.add(refsSource.getTopKey().getColumnVisibilityData());
+        refsSource.next();
+      }
+      if (vc == null) {
+        lastRowVC = Collections.singletonMap(row, null);
+        return null;
+      }
+      lastRowVC = Collections.singletonMap(row, vc.get());
+      return vc.get();
+    }
+
+    private String diffInfo(Value v1, Value v2) {
+      if (v1.getSize() != v2.getSize()) {
+        return "val len not equal " + v1.getSize() + "!=" + v2.getSize();
+      }
+
+      byte[] vb1 = v1.get();
+      byte[] vb2 = v2.get();
+
+      for (int i = 0; i < vb1.length; i++) {
+        if (vb1[i] != vb2[i]) {
+          return String.format("first diff at offset %,d 0x%02x != 0x%02x", i, 0xff & vb1[i],
+              0xff & vb2[i]);
+        }
+      }
+
+      return null;
+    }
+
+    @Override
+    public Key getTopKey() {
+      return topKey;
+    }
+
+    @Override
+    public Value getTopValue() {
+      return topValue;
+    }
+
+    @Override
+    public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
+      ChunkCombiner cc = new ChunkCombiner();
+      try {
+        cc.init(source.deepCopy(env), null, env);
+      } catch (IOException e) {
+        throw new IllegalArgumentException(e);
+      }
+      return cc;
+    }
+  }
+
+  /**
+   * Copied from 2.0 Examples. A utility for merging visibilities into the form
+   * {@code (VIS1)|(VIS2)|...|(VISN)}. Used by the ChunkCombiner.
+   */
+  public static class VisibilityCombiner {
+
+    private TreeSet<String> visibilities = new TreeSet<>();
+
+    void add(ByteSequence cv) {
+      if (cv.length() == 0)
+        return;
+
+      int depth = 0;
+      int offset = 0;
+
+      for (int i = 0; i < cv.length(); i++) {
+        switch (cv.byteAt(i)) {
+          case '(':
+            depth++;
+            break;
+          case ')':
+            depth--;
+            if (depth < 0)
+              throw new IllegalArgumentException("Invalid vis " + cv);
+            break;
+          case '|':
+            if (depth == 0) {
+              insert(cv.subSequence(offset, i));
+              offset = i + 1;
+            }
+
+            break;
+        }
+      }
+
+      insert(cv.subSequence(offset, cv.length()));
+
+      if (depth != 0)
+        throw new IllegalArgumentException("Invalid vis " + cv);
+
+    }
+
+    private void insert(ByteSequence cv) {
+      for (int i = 0; i < cv.length(); i++) {
+
+      }
+
+      String cvs = cv.toString();
+
+      if (cvs.charAt(0) != '(')
+        cvs = "(" + cvs + ")";
+      else {
+        int depth = 0;
+        int depthZeroCloses = 0;
+        for (int i = 0; i < cv.length(); i++) {
+          switch (cv.byteAt(i)) {
+            case '(':
+              depth++;
+              break;
+            case ')':
+              depth--;
+              if (depth == 0)
+                depthZeroCloses++;
+              break;
+          }
+        }
+
+        if (depthZeroCloses > 1)
+          cvs = "(" + cvs + ")";
+      }
+
+      visibilities.add(cvs);
+    }
+
+    byte[] get() {
+      StringBuilder sb = new StringBuilder();
+      String sep = "";
+      for (String cvs : visibilities) {
+        sb.append(sep);
+        sep = "|";
+        sb.append(cvs);
+      }
+
+      return sb.toString().getBytes();
+    }
+  }
+
+  public static final byte[] nullbyte = new byte[] {0};
+
+  /**
+   * Join some number of strings using a null byte separator into a text object.
+   *
+   * @param s
+   *          strings
+   * @return a text object containing the strings separated by null bytes
+   */
+  public static Text buildNullSepText(String... s) {
+    Text t = new Text(s[0]);
+    for (int i = 1; i < s.length; i++) {
+      t.append(nullbyte, 0, 1);
+      t.append(s[i].getBytes(), 0, s[i].length());

Review comment:
       should be `getBytes(UTF_8)`

##########
File path: test/src/main/java/org/apache/accumulo/test/functional/IteratorMincClassCastBugIT.java
##########
@@ -0,0 +1,516 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.admin.NewTableConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.ArrayByteSequence;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.PartialKey;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.IteratorUtil;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+/**
+ * Tests iterator class hierarchy bug. See https://github.com/apache/accumulo/issues/2341
+ */
+public class IteratorMincClassCastBugIT extends AccumuloClusterHarness {
+
+  @Override
+  public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+    // this bug only shows up when not using native maps
+    cfg.setProperty(Property.TSERV_NATIVEMAP_ENABLED, "false");
+    cfg.setNumTservers(1);
+  }
+
+  @Override
+  protected int defaultTimeoutSeconds() {
+    return 60;
+  }
+
+  @Test
+  public void test() throws Exception {
+    try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) {
+
+      String tableName = getUniqueNames(1)[0];
+
+      NewTableConfiguration ntc = new NewTableConfiguration();
+      Map<String,Set<Text>> groups = new HashMap<>();
+      groups.put("g1", Set.of(new Text("~chunk")));
+      groups.put("g2", Set.of(new Text("refs")));
+      ntc.setLocalityGroups(groups);
+
+      IteratorSetting iteratorSetting = new IteratorSetting(20, ChunkCombiner.class);
+      ntc.attachIterator(iteratorSetting, EnumSet.of(IteratorUtil.IteratorScope.minc));
+
+      c.tableOperations().create(tableName, ntc);
+
+      int chunkSize = 64 * 1024;
+      ColumnVisibility visibility = new ColumnVisibility();
+
+      List<URL> files = new ArrayList<>();
+      files.add(getClass().getClassLoader().getResource("testfile1.md"));
+
+      try (BatchWriter bw = c.createBatchWriter(tableName, new BatchWriterConfig())) {
+        FileDataIngest fdi = new FileDataIngest(chunkSize, visibility);
+        for (URL filename : files) {
+          fdi.insertFileData(filename, bw);
+        }
+      }
+
+      c.tableOperations().flush(tableName, null, null, true);
+    }
+  }
+
+  /**
+   * Copied from 2.0 examples. Takes a list of files and archives them into Accumulo keyed on hashes
+   * of the files.
+   */
+  @SuppressFBWarnings(value = "WEAK_MESSAGE_DIGEST_MD5", justification = "For testing only")
+  public static class FileDataIngest {
+    public static final Text CHUNK_CF = new Text("~chunk");
+    public static final Text REFS_CF = new Text("refs");
+    public static final String REFS_ORIG_FILE = "name";
+    public static final String REFS_FILE_EXT = "filext";
+    public static final ByteSequence CHUNK_CF_BS =
+        new ArrayByteSequence(CHUNK_CF.getBytes(), 0, CHUNK_CF.getLength());
+    public static final ByteSequence REFS_CF_BS =
+        new ArrayByteSequence(REFS_CF.getBytes(), 0, REFS_CF.getLength());
+
+    int chunkSize;
+    byte[] chunkSizeBytes;
+    byte[] buf;
+    MessageDigest md5digest;
+    ColumnVisibility cv;
+
+    public FileDataIngest(int chunkSize, ColumnVisibility colvis) {
+      this.chunkSize = chunkSize;
+      chunkSizeBytes = intToBytes(chunkSize);
+      buf = new byte[chunkSize];
+      try {
+        md5digest = MessageDigest.getInstance("MD5");
+      } catch (NoSuchAlgorithmException e) {
+        throw new RuntimeException(e);
+      }
+      cv = colvis;
+    }
+
+    public String insertFileData(URL fileURL, BatchWriter bw)
+        throws MutationsRejectedException, IOException {
+      String filename = fileURL.getFile();
+      if (chunkSize == 0)
+        return "";
+      md5digest.reset();
+      String uid = hexString(md5digest.digest(filename.getBytes()));
+
+      // read through file once, calculating hashes
+      md5digest.reset();
+      InputStream fis = null;
+      int numRead = 0;
+      // try (var reader = new InputStreamReader(accumuloPropsLocation.openStream(), UTF_8)) {
+      try {
+        fis = new FileInputStream(filename);
+        numRead = fis.read(buf);
+        while (numRead >= 0) {
+          if (numRead > 0) {
+            md5digest.update(buf, 0, numRead);
+          }
+          numRead = fis.read(buf);
+        }
+      } finally {
+        if (fis != null) {
+          fis.close();
+        }
+      }
+
+      String hash = hexString(md5digest.digest());
+      Text row = new Text(hash);
+
+      // write info to accumulo
+      Mutation m = new Mutation(row);
+      m.put(REFS_CF, buildNullSepText(uid, REFS_ORIG_FILE), cv, new Value(filename.getBytes()));
+      String fext = getExt(filename);
+      if (fext != null)
+        m.put(REFS_CF, buildNullSepText(uid, REFS_FILE_EXT), cv, new Value(fext.getBytes()));
+      bw.addMutation(m);
+
+      // read through file again, writing chunks to accumulo
+      int chunkCount = 0;
+      try {
+        fis = new FileInputStream(filename);
+        numRead = fis.read(buf);
+        while (numRead >= 0) {
+          while (numRead < buf.length) {
+            int moreRead = fis.read(buf, numRead, buf.length - numRead);
+            if (moreRead > 0)
+              numRead += moreRead;
+            else if (moreRead < 0)
+              break;
+          }
+          m = new Mutation(row);
+          Text chunkCQ = new Text(chunkSizeBytes);
+          chunkCQ.append(intToBytes(chunkCount), 0, 4);
+          m.put(CHUNK_CF, chunkCQ, cv, new Value(buf, 0, numRead));
+          bw.addMutation(m);
+          if (chunkCount == Integer.MAX_VALUE)
+            throw new RuntimeException(
+                "too many chunks for file " + filename + ", try raising chunk size");
+          chunkCount++;
+          numRead = fis.read(buf);
+        }
+      } finally {
+        if (fis != null) {
+          fis.close();
+        }
+      }
+      m = new Mutation(row);
+      Text chunkCQ = new Text(chunkSizeBytes);
+      chunkCQ.append(intToBytes(chunkCount), 0, 4);
+      m.put(new Text(CHUNK_CF), chunkCQ, cv, new Value(new byte[0]));
+      bw.addMutation(m);
+      return hash;
+    }
+
+    public static byte[] intToBytes(int l) {
+      byte[] b = new byte[4];
+      b[0] = (byte) (l >>> 24);
+      b[1] = (byte) (l >>> 16);
+      b[2] = (byte) (l >>> 8);
+      b[3] = (byte) (l >>> 0);
+      return b;
+    }
+
+    private static String getExt(String filename) {
+      if (filename.indexOf(".") == -1)
+        return null;
+      return filename.substring(filename.lastIndexOf(".") + 1);
+    }
+
+    public String hexString(byte[] bytes) {
+      StringBuilder sb = new StringBuilder();
+      for (byte b : bytes) {
+        sb.append(String.format("%02x", b));
+      }
+      return sb.toString();
+    }
+  }
+
+  /**
+   * This iterator dedupes chunks and sets their visibilities to the combined visibility of the refs
+   * columns. For example, it would combine
+   *
+   * <pre>
+   *    row1 refs uid1\0a A&amp;B V0
+   *    row1 refs uid2\0b C&amp;D V0
+   *    row1 ~chunk 0 A&amp;B V1
+   *    row1 ~chunk 0 C&amp;D V1
+   *    row1 ~chunk 0 E&amp;F V1
+   *    row1 ~chunk 0 G&amp;H V1
+   * </pre>
+   *
+   * into the following
+   *
+   * <pre>
+   *    row1 refs uid1\0a A&amp;B V0
+   *    row1 refs uid2\0b C&amp;D V0
+   *    row1 ~chunk 0 (A&amp;B)|(C&amp;D) V1
+   * </pre>
+   *
+   * {@link VisibilityCombiner} is used to combie the visibilities.
+   */
+  public static class ChunkCombiner implements SortedKeyValueIterator<Key,Value> {
+    private static final Logger log = LoggerFactory.getLogger(ChunkCombiner.class);
+
+    private SortedKeyValueIterator<Key,Value> source;
+    private SortedKeyValueIterator<Key,Value> refsSource;
+    private static final Collection<ByteSequence> refsColf =
+        Collections.singleton(FileDataIngest.REFS_CF_BS);
+    private Map<Text,byte[]> lastRowVC = Collections.emptyMap();
+
+    private Key topKey = null;
+    private Value topValue = null;
+
+    public ChunkCombiner() {}
+
+    @Override
+    public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options,
+        IteratorEnvironment env) throws IOException {
+      this.source = source;
+      this.refsSource = source.deepCopy(env);
+      log.info("DUDE call init. deepCopy on {}", source.getClass());
+    }
+
+    @Override
+    public boolean hasTop() {
+      return topKey != null;
+    }
+
+    @Override
+    public void next() throws IOException {
+      findTop();
+    }
+
+    @Override
+    public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive)
+        throws IOException {
+      source.seek(range, columnFamilies, inclusive);
+      findTop();
+    }
+
+    private void findTop() throws IOException {
+      do {
+        topKey = null;
+        topValue = null;
+      } while (source.hasTop() && _findTop() == null);
+    }
+
+    private byte[] _findTop() throws IOException {
+      long maxTS;
+
+      topKey = new Key(source.getTopKey());
+      topValue = new Value(source.getTopValue());
+      source.next();
+
+      if (!topKey.getColumnFamilyData().equals(FileDataIngest.CHUNK_CF_BS))
+        return topKey.getColumnVisibility().getBytes();
+
+      maxTS = topKey.getTimestamp();
+
+      while (source.hasTop() && source.getTopKey().equals(topKey, PartialKey.ROW_COLFAM_COLQUAL)) {
+        if (source.getTopKey().getTimestamp() > maxTS)
+          maxTS = source.getTopKey().getTimestamp();
+
+        if (!topValue.equals(source.getTopValue()))
+          throw new RuntimeException("values not equals " + topKey + " " + source.getTopKey()
+              + " : " + diffInfo(topValue, source.getTopValue()));
+
+        source.next();
+      }
+
+      byte[] vis = getVisFromRefs();
+      if (vis != null) {
+        topKey = new Key(topKey.getRowData().toArray(), topKey.getColumnFamilyData().toArray(),
+            topKey.getColumnQualifierData().toArray(), vis, maxTS);
+      }
+      return vis;
+    }
+
+    private byte[] getVisFromRefs() throws IOException {
+      Text row = topKey.getRow();
+      if (lastRowVC.containsKey(row))
+        return lastRowVC.get(row);
+      Range range = new Range(row);
+      refsSource.seek(range, refsColf, true);
+      VisibilityCombiner vc = null;
+      while (refsSource.hasTop()) {
+        if (vc == null)
+          vc = new VisibilityCombiner();
+        vc.add(refsSource.getTopKey().getColumnVisibilityData());
+        refsSource.next();
+      }
+      if (vc == null) {
+        lastRowVC = Collections.singletonMap(row, null);
+        return null;
+      }
+      lastRowVC = Collections.singletonMap(row, vc.get());
+      return vc.get();
+    }
+
+    private String diffInfo(Value v1, Value v2) {
+      if (v1.getSize() != v2.getSize()) {
+        return "val len not equal " + v1.getSize() + "!=" + v2.getSize();
+      }
+
+      byte[] vb1 = v1.get();
+      byte[] vb2 = v2.get();
+
+      for (int i = 0; i < vb1.length; i++) {
+        if (vb1[i] != vb2[i]) {
+          return String.format("first diff at offset %,d 0x%02x != 0x%02x", i, 0xff & vb1[i],
+              0xff & vb2[i]);
+        }
+      }
+
+      return null;
+    }
+
+    @Override
+    public Key getTopKey() {
+      return topKey;
+    }
+
+    @Override
+    public Value getTopValue() {
+      return topValue;
+    }
+
+    @Override
+    public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
+      ChunkCombiner cc = new ChunkCombiner();
+      try {
+        cc.init(source.deepCopy(env), null, env);
+      } catch (IOException e) {
+        throw new IllegalArgumentException(e);
+      }
+      return cc;
+    }
+  }
+
+  /**
+   * Copied from 2.0 Examples. A utility for merging visibilities into the form
+   * {@code (VIS1)|(VIS2)|...|(VISN)}. Used by the ChunkCombiner.
+   */
+  public static class VisibilityCombiner {
+
+    private TreeSet<String> visibilities = new TreeSet<>();
+
+    void add(ByteSequence cv) {
+      if (cv.length() == 0)
+        return;
+
+      int depth = 0;
+      int offset = 0;
+
+      for (int i = 0; i < cv.length(); i++) {
+        switch (cv.byteAt(i)) {
+          case '(':
+            depth++;
+            break;
+          case ')':
+            depth--;
+            if (depth < 0)
+              throw new IllegalArgumentException("Invalid vis " + cv);
+            break;
+          case '|':
+            if (depth == 0) {
+              insert(cv.subSequence(offset, i));
+              offset = i + 1;
+            }
+
+            break;
+        }
+      }
+
+      insert(cv.subSequence(offset, cv.length()));
+
+      if (depth != 0)
+        throw new IllegalArgumentException("Invalid vis " + cv);
+
+    }
+
+    private void insert(ByteSequence cv) {
+      for (int i = 0; i < cv.length(); i++) {
+
+      }
+
+      String cvs = cv.toString();
+
+      if (cvs.charAt(0) != '(')
+        cvs = "(" + cvs + ")";
+      else {
+        int depth = 0;
+        int depthZeroCloses = 0;
+        for (int i = 0; i < cv.length(); i++) {
+          switch (cv.byteAt(i)) {
+            case '(':
+              depth++;
+              break;
+            case ')':
+              depth--;
+              if (depth == 0)
+                depthZeroCloses++;
+              break;
+          }
+        }
+
+        if (depthZeroCloses > 1)
+          cvs = "(" + cvs + ")";
+      }
+
+      visibilities.add(cvs);
+    }
+
+    byte[] get() {
+      StringBuilder sb = new StringBuilder();
+      String sep = "";
+      for (String cvs : visibilities) {
+        sb.append(sep);
+        sep = "|";
+        sb.append(cvs);
+      }
+
+      return sb.toString().getBytes();

Review comment:
       Should be `getBytes(UTF_8)`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] milleruntime commented on pull request #2347: New IteratorMincClassCastBugIT and fixes to #1411

Posted by GitBox <gi...@apache.org>.
milleruntime commented on pull request #2347:
URL: https://github.com/apache/accumulo/pull/2347#issuecomment-961963851






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] milleruntime commented on pull request #2347: New IteratorMincClassCastBugIT and fixes to #1411

Posted by GitBox <gi...@apache.org>.
milleruntime commented on pull request #2347:
URL: https://github.com/apache/accumulo/pull/2347#issuecomment-961963851


   @Manno15 @ctubbsii thanks for taking a look at this. I guess I should have been more explicit or made this a draft PR because a lot of the code in your feedback was just yanked from the examples repo, specifically the [ChunkCombiner](https://github.com/apache/accumulo-examples/blob/main/src/main/java/org/apache/accumulo/examples/filedata/ChunkCombiner.java) to try and simulate what Datawave is doing with iterators. I am not sure if I should clean it up or just stick it in a test jar so we don't have to look at it. 
   
   Do either of you know how to get MiniAccumuloCluster to load classes from a jar during runtime? I thought it might be a better test to do that but couldn't get it working.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] milleruntime commented on pull request #2347: New IteratorMincClassCastBugIT and fixes to #1411

Posted by GitBox <gi...@apache.org>.
milleruntime commented on pull request #2347:
URL: https://github.com/apache/accumulo/pull/2347#issuecomment-963427095


   > Changes look good, I don't fully know the impact of the interuptible changes but I didn't run into any issues while testing. Confirmed that these changes to fix the reported issue.
   
   Thanks for testing these changes. Just curious, how did you reproduce the issue?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] milleruntime commented on pull request #2347: New IteratorMincClassCastBugIT and fixes to #1411

Posted by GitBox <gi...@apache.org>.
milleruntime commented on pull request #2347:
URL: https://github.com/apache/accumulo/pull/2347#issuecomment-961963851






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] ctubbsii commented on a change in pull request #2347: New IteratorMincClassCastBugIT and fixes to #1411

Posted by GitBox <gi...@apache.org>.
ctubbsii commented on a change in pull request #2347:
URL: https://github.com/apache/accumulo/pull/2347#discussion_r745063876



##########
File path: core/src/main/java/org/apache/accumulo/core/iteratorsImpl/system/SortedMapIterator.java
##########
@@ -121,4 +143,9 @@ public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> op
       IteratorEnvironment env) throws IOException {
     throw new UnsupportedOperationException();
   }
+
+  @Override
+  public void setInterruptFlag(AtomicBoolean flag) {
+    this.interruptFlag = flag;
+  }

Review comment:
       My suggestion would be to use BooleanSupplier in the InterruptibleIterator interface, and call `setInterruptFlag(interruptFlag::get)` (where `interruptFlag` in the caller is of type `AtomicBoolean`). That would remove any confusion over implied atomicity guarantees inside the InterruptibleIterator interface, but wouldn't change any behavior. But, obviously, that would be a follow-on issue. It need not be done as part of this PR.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] Manno15 commented on a change in pull request #2347: New IteratorMincClassCastBugIT and fixes to #1411

Posted by GitBox <gi...@apache.org>.
Manno15 commented on a change in pull request #2347:
URL: https://github.com/apache/accumulo/pull/2347#discussion_r743164169



##########
File path: test/src/main/java/org/apache/accumulo/test/functional/IteratorMincClassCastBugIT.java
##########
@@ -0,0 +1,516 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.admin.NewTableConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.ArrayByteSequence;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.PartialKey;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.IteratorUtil;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+/**
+ * Tests iterator class hierarchy bug. See https://github.com/apache/accumulo/issues/2341
+ */
+public class IteratorMincClassCastBugIT extends AccumuloClusterHarness {
+
+  @Override
+  public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+    // this bug only shows up when not using native maps
+    cfg.setProperty(Property.TSERV_NATIVEMAP_ENABLED, "false");
+    cfg.setNumTservers(1);
+  }
+
+  @Override
+  protected int defaultTimeoutSeconds() {
+    return 60;
+  }
+
+  @Test
+  public void test() throws Exception {
+    try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) {
+
+      String tableName = getUniqueNames(1)[0];
+
+      NewTableConfiguration ntc = new NewTableConfiguration();
+      Map<String,Set<Text>> groups = new HashMap<>();
+      groups.put("g1", Set.of(new Text("~chunk")));
+      groups.put("g2", Set.of(new Text("refs")));
+      ntc.setLocalityGroups(groups);
+
+      IteratorSetting iteratorSetting = new IteratorSetting(20, ChunkCombiner.class);
+      ntc.attachIterator(iteratorSetting, EnumSet.of(IteratorUtil.IteratorScope.minc));
+
+      c.tableOperations().create(tableName, ntc);
+
+      int chunkSize = 64 * 1024;
+      ColumnVisibility visibility = new ColumnVisibility();
+
+      List<URL> files = new ArrayList<>();
+      files.add(getClass().getClassLoader().getResource("testfile1.md"));
+
+      try (BatchWriter bw = c.createBatchWriter(tableName, new BatchWriterConfig())) {
+        FileDataIngest fdi = new FileDataIngest(chunkSize, visibility);
+        for (URL filename : files) {
+          fdi.insertFileData(filename, bw);
+        }
+      }
+
+      c.tableOperations().flush(tableName, null, null, true);
+    }
+  }
+
+  /**
+   * Copied from 2.0 examples. Takes a list of files and archives them into Accumulo keyed on hashes
+   * of the files.
+   */
+  @SuppressFBWarnings(value = "WEAK_MESSAGE_DIGEST_MD5", justification = "For testing only")
+  public static class FileDataIngest {
+    public static final Text CHUNK_CF = new Text("~chunk");
+    public static final Text REFS_CF = new Text("refs");
+    public static final String REFS_ORIG_FILE = "name";
+    public static final String REFS_FILE_EXT = "filext";
+    public static final ByteSequence CHUNK_CF_BS =
+        new ArrayByteSequence(CHUNK_CF.getBytes(), 0, CHUNK_CF.getLength());
+    public static final ByteSequence REFS_CF_BS =
+        new ArrayByteSequence(REFS_CF.getBytes(), 0, REFS_CF.getLength());
+
+    int chunkSize;
+    byte[] chunkSizeBytes;
+    byte[] buf;
+    MessageDigest md5digest;
+    ColumnVisibility cv;
+
+    public FileDataIngest(int chunkSize, ColumnVisibility colvis) {
+      this.chunkSize = chunkSize;
+      chunkSizeBytes = intToBytes(chunkSize);
+      buf = new byte[chunkSize];
+      try {
+        md5digest = MessageDigest.getInstance("MD5");
+      } catch (NoSuchAlgorithmException e) {
+        throw new RuntimeException(e);
+      }
+      cv = colvis;
+    }
+
+    public String insertFileData(URL fileURL, BatchWriter bw)
+        throws MutationsRejectedException, IOException {
+      String filename = fileURL.getFile();
+      if (chunkSize == 0)
+        return "";
+      md5digest.reset();
+      String uid = hexString(md5digest.digest(filename.getBytes()));
+
+      // read through file once, calculating hashes
+      md5digest.reset();
+      InputStream fis = null;
+      int numRead = 0;
+      // try (var reader = new InputStreamReader(accumuloPropsLocation.openStream(), UTF_8)) {

Review comment:
       ```suggestion
   
   ```

##########
File path: test/src/main/java/org/apache/accumulo/test/functional/IteratorMincClassCastBugIT.java
##########
@@ -0,0 +1,516 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.admin.NewTableConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.ArrayByteSequence;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.PartialKey;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.IteratorUtil;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+/**
+ * Tests iterator class hierarchy bug. See https://github.com/apache/accumulo/issues/2341
+ */
+public class IteratorMincClassCastBugIT extends AccumuloClusterHarness {
+
+  @Override
+  public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+    // this bug only shows up when not using native maps
+    cfg.setProperty(Property.TSERV_NATIVEMAP_ENABLED, "false");
+    cfg.setNumTservers(1);
+  }
+
+  @Override
+  protected int defaultTimeoutSeconds() {
+    return 60;
+  }
+
+  @Test
+  public void test() throws Exception {
+    try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) {
+
+      String tableName = getUniqueNames(1)[0];
+
+      NewTableConfiguration ntc = new NewTableConfiguration();
+      Map<String,Set<Text>> groups = new HashMap<>();
+      groups.put("g1", Set.of(new Text("~chunk")));
+      groups.put("g2", Set.of(new Text("refs")));
+      ntc.setLocalityGroups(groups);
+
+      IteratorSetting iteratorSetting = new IteratorSetting(20, ChunkCombiner.class);
+      ntc.attachIterator(iteratorSetting, EnumSet.of(IteratorUtil.IteratorScope.minc));
+
+      c.tableOperations().create(tableName, ntc);
+
+      int chunkSize = 64 * 1024;
+      ColumnVisibility visibility = new ColumnVisibility();
+
+      List<URL> files = new ArrayList<>();
+      files.add(getClass().getClassLoader().getResource("testfile1.md"));
+
+      try (BatchWriter bw = c.createBatchWriter(tableName, new BatchWriterConfig())) {
+        FileDataIngest fdi = new FileDataIngest(chunkSize, visibility);
+        for (URL filename : files) {
+          fdi.insertFileData(filename, bw);
+        }
+      }
+
+      c.tableOperations().flush(tableName, null, null, true);
+    }
+  }
+
+  /**
+   * Copied from 2.0 examples. Takes a list of files and archives them into Accumulo keyed on hashes
+   * of the files.
+   */
+  @SuppressFBWarnings(value = "WEAK_MESSAGE_DIGEST_MD5", justification = "For testing only")
+  public static class FileDataIngest {
+    public static final Text CHUNK_CF = new Text("~chunk");
+    public static final Text REFS_CF = new Text("refs");
+    public static final String REFS_ORIG_FILE = "name";
+    public static final String REFS_FILE_EXT = "filext";
+    public static final ByteSequence CHUNK_CF_BS =
+        new ArrayByteSequence(CHUNK_CF.getBytes(), 0, CHUNK_CF.getLength());
+    public static final ByteSequence REFS_CF_BS =
+        new ArrayByteSequence(REFS_CF.getBytes(), 0, REFS_CF.getLength());
+
+    int chunkSize;
+    byte[] chunkSizeBytes;
+    byte[] buf;
+    MessageDigest md5digest;
+    ColumnVisibility cv;
+
+    public FileDataIngest(int chunkSize, ColumnVisibility colvis) {
+      this.chunkSize = chunkSize;
+      chunkSizeBytes = intToBytes(chunkSize);
+      buf = new byte[chunkSize];
+      try {
+        md5digest = MessageDigest.getInstance("MD5");
+      } catch (NoSuchAlgorithmException e) {
+        throw new RuntimeException(e);
+      }
+      cv = colvis;
+    }
+
+    public String insertFileData(URL fileURL, BatchWriter bw)
+        throws MutationsRejectedException, IOException {
+      String filename = fileURL.getFile();
+      if (chunkSize == 0)
+        return "";
+      md5digest.reset();
+      String uid = hexString(md5digest.digest(filename.getBytes()));
+
+      // read through file once, calculating hashes
+      md5digest.reset();
+      InputStream fis = null;
+      int numRead = 0;
+      // try (var reader = new InputStreamReader(accumuloPropsLocation.openStream(), UTF_8)) {

Review comment:
       ```suggestion
   
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] milleruntime commented on a change in pull request #2347: New IteratorMincClassCastBugIT and fixes to #1411

Posted by GitBox <gi...@apache.org>.
milleruntime commented on a change in pull request #2347:
URL: https://github.com/apache/accumulo/pull/2347#discussion_r745036356



##########
File path: core/src/main/java/org/apache/accumulo/core/iteratorsImpl/system/SortedMapIterator.java
##########
@@ -121,4 +143,9 @@ public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> op
       IteratorEnvironment env) throws IOException {
     throw new UnsupportedOperationException();
   }
+
+  @Override
+  public void setInterruptFlag(AtomicBoolean flag) {
+    this.interruptFlag = flag;
+  }

Review comment:
       I agree. I am hesitant to go changing internals with the iterators though since they are so heavily reliant on runtime configuration (one of the reasons it took so long to find this bug in the first place) and I don't think we have adequate test coverage for their behavior.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] milleruntime commented on a change in pull request #2347: New IteratorMincClassCastBugIT and fixes to #1411

Posted by GitBox <gi...@apache.org>.
milleruntime commented on a change in pull request #2347:
URL: https://github.com/apache/accumulo/pull/2347#discussion_r743731017



##########
File path: core/src/main/java/org/apache/accumulo/core/iterators/SortedMapIterator.java
##########
@@ -36,28 +37,40 @@
  *
  * Note that this class is intended as an in-memory replacement for RFile$Reader, so its behavior
  * reflects the same assumptions; namely, that this iterator is not responsible for respecting the
- * columnFamilies passed into seek().
+ * columnFamilies passed into seek(). If you want a Map-backed Iterator that returns only sought
+ * CFs, construct a new ColumnFamilySkippingIterator(new SortedMapIterator(map)).
+ *
+ * @see org.apache.accumulo.core.iterators.ColumnFamilySkippingIterator
  */
-public class SortedMapIterator implements SortedKeyValueIterator<Key,Value> {
+public class SortedMapIterator implements InterruptibleIterator {
   private Iterator<Entry<Key,Value>> iter;
   private Entry<Key,Value> entry;
 
   private SortedMap<Key,Value> map;
   private Range range;
 
+  private AtomicBoolean interruptFlag;

Review comment:
       We should be able to make this final with an initial value of `false`. I haven't really touched the iterators since most of them have been around since forever but I think this would be a good fix. FYI I had a hell of a time tracking down where we actually do the interrupt. As a reference, it seems like the only place we actually interrupt iterators is in `ScanDataSource`:
   https://github.com/apache/accumulo/blob/8a636a3ba91f5dae1d8b09b095178889a7d79c1d/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java#L256-L258
   When a tablet is closed:
   https://github.com/apache/accumulo/blob/b5ca15806659efbbd6f5d158be8648cc2cd13593/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java#L1288
   
   We have a ton of calls to pass the flag around between iterators, readers and RFile code but that seems to be the only place where the actual interrupt takes place.

##########
File path: core/src/main/java/org/apache/accumulo/core/iterators/ColumnFamilySkippingIterator.java
##########
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.accumulo.core.iteratorsImpl.system;
+package org.apache.accumulo.core.iterators;

Review comment:
       That is what I had originally thought but after investigating this bug it seems like `SortedMapIterator` and this one could be cast to an InterruptibleIterator in the `LocalityGroupIterator`
   https://github.com/apache/accumulo/blob/8d33bc4d489880f15950189d20287262ce46e68e/core/src/main/java/org/apache/accumulo/core/iteratorsImpl/system/LocalityGroupIterator.java#L49
   
   I think if a user extends one of them and configures it to run in certain situations it could be created through deepCopy(). But it also seems dangerous either way.

##########
File path: core/src/main/java/org/apache/accumulo/core/iterators/ColumnFamilySkippingIterator.java
##########
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.accumulo.core.iteratorsImpl.system;
+package org.apache.accumulo.core.iterators;

Review comment:
       Also, my original intention in #1411 was since SortedMapIterator was used so much in our Tests that it would be useful to users as well. But I could be wrong in that thinking. It could be unsafe to expose it to users so a better option may be to move it in with the other system iterators. I can't remember if I attempted this or how difficult this would be though.

##########
File path: core/src/main/java/org/apache/accumulo/core/iterators/SortedMapIterator.java
##########
@@ -36,28 +37,40 @@
  *
  * Note that this class is intended as an in-memory replacement for RFile$Reader, so its behavior
  * reflects the same assumptions; namely, that this iterator is not responsible for respecting the
- * columnFamilies passed into seek().
+ * columnFamilies passed into seek(). If you want a Map-backed Iterator that returns only sought
+ * CFs, construct a new ColumnFamilySkippingIterator(new SortedMapIterator(map)).
+ *
+ * @see org.apache.accumulo.core.iterators.ColumnFamilySkippingIterator
  */
-public class SortedMapIterator implements SortedKeyValueIterator<Key,Value> {
+public class SortedMapIterator implements InterruptibleIterator {
   private Iterator<Entry<Key,Value>> iter;
   private Entry<Key,Value> entry;
 
   private SortedMap<Key,Value> map;
   private Range range;
 
+  private AtomicBoolean interruptFlag;

Review comment:
       I thought about changing the `AtomicBoolean` to be final but now I am wondering if it was written this way with performance in mind. We do a lot of passing around of the flag across the iterator stack but only ever set it in one place.  If we were to make it final, then I think that would be a lot of extra memory accesses to the atomic value vs just passing around a reference.
   
   I thought of doing something like this in the setter, but like I said, I think it would be less efficient:
   <pre>
   public void setInterruptFlag(AtomicBoolean flag) {
       boolean newFlag = flag.get();
       boolean previousFlag = this.interruptFlag.getAndSet(newFlag);
       if (previousFlag == newFlag && newFlag)
         log.warn("Interrupt called on iterator that was previously interrupted.");
     }
   </pre>




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] milleruntime commented on a change in pull request #2347: New IteratorMincClassCastBugIT and fixes to #1411

Posted by GitBox <gi...@apache.org>.
milleruntime commented on a change in pull request #2347:
URL: https://github.com/apache/accumulo/pull/2347#discussion_r743769254



##########
File path: core/src/main/java/org/apache/accumulo/core/iterators/SortedMapIterator.java
##########
@@ -36,28 +37,40 @@
  *
  * Note that this class is intended as an in-memory replacement for RFile$Reader, so its behavior
  * reflects the same assumptions; namely, that this iterator is not responsible for respecting the
- * columnFamilies passed into seek().
+ * columnFamilies passed into seek(). If you want a Map-backed Iterator that returns only sought
+ * CFs, construct a new ColumnFamilySkippingIterator(new SortedMapIterator(map)).
+ *
+ * @see org.apache.accumulo.core.iterators.ColumnFamilySkippingIterator
  */
-public class SortedMapIterator implements SortedKeyValueIterator<Key,Value> {
+public class SortedMapIterator implements InterruptibleIterator {
   private Iterator<Entry<Key,Value>> iter;
   private Entry<Key,Value> entry;
 
   private SortedMap<Key,Value> map;
   private Range range;
 
+  private AtomicBoolean interruptFlag;

Review comment:
       I thought about changing the `AtomicBoolean` to be final but now I am wondering if it was written this way with performance in mind. We do a lot of passing around of the flag across the iterator stack but only ever set it in one place.  If we were to make it final, then I think that would be a lot of extra memory accesses to the atomic value vs just passing around a reference.
   
   I thought of doing something like this in the setter, but like I said, I think it would be less efficient:
   <pre>
   public void setInterruptFlag(AtomicBoolean flag) {
       boolean newFlag = flag.get();
       boolean previousFlag = this.interruptFlag.getAndSet(newFlag);
       if (previousFlag == newFlag && newFlag)
         log.warn("Interrupt called on iterator that was previously interrupted.");
     }
   </pre>




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] Manno15 commented on pull request #2347: New IteratorMincClassCastBugIT and fixes to #1411

Posted by GitBox <gi...@apache.org>.
Manno15 commented on pull request #2347:
URL: https://github.com/apache/accumulo/pull/2347#issuecomment-963430813


   > Thanks for testing these changes. Just curious, how did you reproduce the issue?
   
   Using the IT you created. So maybe that isn't a definitive test but it did fail against main while working with the rest of these changes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] ctubbsii commented on a change in pull request #2347: New IteratorMincClassCastBugIT and fixes to #1411

Posted by GitBox <gi...@apache.org>.
ctubbsii commented on a change in pull request #2347:
URL: https://github.com/apache/accumulo/pull/2347#discussion_r744984895



##########
File path: core/src/main/java/org/apache/accumulo/core/iterators/IterationInterruptedException.java
##########
@@ -18,6 +18,9 @@
  */
 package org.apache.accumulo.core.iterators;
 
+/**
+ * Exception thrown if an interrupt flag is detected.
+ */
 public class IterationInterruptedException extends RuntimeException {

Review comment:
       Should this RTE exist in public API, or should it be in iteratorsImpl as well?

##########
File path: core/src/main/java/org/apache/accumulo/core/iteratorsImpl/system/SortedMapIterator.java
##########
@@ -121,4 +143,9 @@ public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> op
       IteratorEnvironment env) throws IOException {
     throw new UnsupportedOperationException();
   }
+
+  @Override
+  public void setInterruptFlag(AtomicBoolean flag) {
+    this.interruptFlag = flag;
+  }

Review comment:
       Because changing the reference breaks atomicity guarantees, this method looks dubious. It looks like the way this is intended to be used is like a BooleanSupplier (or even more strictly, a one-way latch that can move from false to true, but never back to false). I'm not sure the right solution, but if the interruptible stuff is only on the server-side, it could be cleaned up to avoid using AtomicBoolean and use something more appropriate instead.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] ctubbsii commented on a change in pull request #2347: New IteratorMincClassCastBugIT and fixes to #1411

Posted by GitBox <gi...@apache.org>.
ctubbsii commented on a change in pull request #2347:
URL: https://github.com/apache/accumulo/pull/2347#discussion_r745061737



##########
File path: core/src/main/java/org/apache/accumulo/core/iterators/SortedMapIterator.java
##########
@@ -36,28 +37,40 @@
  *
  * Note that this class is intended as an in-memory replacement for RFile$Reader, so its behavior
  * reflects the same assumptions; namely, that this iterator is not responsible for respecting the
- * columnFamilies passed into seek().
+ * columnFamilies passed into seek(). If you want a Map-backed Iterator that returns only sought
+ * CFs, construct a new ColumnFamilySkippingIterator(new SortedMapIterator(map)).
+ *
+ * @see org.apache.accumulo.core.iterators.ColumnFamilySkippingIterator
  */
-public class SortedMapIterator implements SortedKeyValueIterator<Key,Value> {
+public class SortedMapIterator implements InterruptibleIterator {
   private Iterator<Entry<Key,Value>> iter;
   private Entry<Key,Value> entry;
 
   private SortedMap<Key,Value> map;
   private Range range;
 
+  private AtomicBoolean interruptFlag;

Review comment:
       It's not a performance thing. It's an implementation injection mechanism (see where we're notifying the iterators by changing the value in the scan code: `grep -lR 'interruptFlag.set(true)' .`. AtomicBoolean is being used like a BooleanSupplier. This was probably fine at one point, since BooleanSupplier was introduced in Java 8, and before that, AtomicBoolean was an attractive alternative to a built-in supplier. But, having the type be AtomicBoolean implies atomicity, and that can be confusing, since atomicity guarantees are clearly broken by having a setter that can swap out the reference itself in addition to the value. We're not actually relying on the atomicity guarantees in these cases, though, as far as I can tell, so BooleanSupplier would suffice as the type.
   
   Your code above wouldn't work. I had a similar idea initially (`this.interruptFlag.set(flag.get())`). However, it's not sufficient to set the value. We need to set the containing object reference, so we can communicate the change in value when the caller updates the source.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] ctubbsii commented on a change in pull request #2347: New IteratorMincClassCastBugIT and fixes to #1411

Posted by GitBox <gi...@apache.org>.
ctubbsii commented on a change in pull request #2347:
URL: https://github.com/apache/accumulo/pull/2347#discussion_r743166509



##########
File path: core/src/main/java/org/apache/accumulo/core/iterators/InterruptibleIterator.java
##########
@@ -16,14 +16,22 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.accumulo.core.iteratorsImpl.system;
+package org.apache.accumulo.core.iterators;
 
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 
+/**
+ * Allows an iterator to be interrupted. Typically, once the interrupt flag is set the iterator will
+ * throw an {@link InterruptedException} if the interrupt is detected. Some iterators have been
+ * optimized to not always check the flag.
+ *
+ * One example of a system interrupt is when a Tablet is being closed. If a Tablet has an active

Review comment:
       Blank line in javadoc without a `<p>` isn't correct

##########
File path: core/src/main/java/org/apache/accumulo/core/iterators/SortedMapIterator.java
##########
@@ -36,28 +37,40 @@
  *
  * Note that this class is intended as an in-memory replacement for RFile$Reader, so its behavior
  * reflects the same assumptions; namely, that this iterator is not responsible for respecting the
- * columnFamilies passed into seek().
+ * columnFamilies passed into seek(). If you want a Map-backed Iterator that returns only sought
+ * CFs, construct a new ColumnFamilySkippingIterator(new SortedMapIterator(map)).
+ *
+ * @see org.apache.accumulo.core.iterators.ColumnFamilySkippingIterator
  */
-public class SortedMapIterator implements SortedKeyValueIterator<Key,Value> {
+public class SortedMapIterator implements InterruptibleIterator {
   private Iterator<Entry<Key,Value>> iter;
   private Entry<Key,Value> entry;
 
   private SortedMap<Key,Value> map;
   private Range range;
 
+  private AtomicBoolean interruptFlag;

Review comment:
       Atomic references are typically `final` whenever possible, to prevent bugs where atomicity is broken because the reference is changed. I see this can't be made final because there's a setter... but that seems like bad API design. Is there something we can do to fix this?

##########
File path: test/src/main/java/org/apache/accumulo/test/functional/IteratorMincClassCastBugIT.java
##########
@@ -0,0 +1,516 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.admin.NewTableConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.ArrayByteSequence;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.PartialKey;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.IteratorUtil;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+/**
+ * Tests iterator class hierarchy bug. See https://github.com/apache/accumulo/issues/2341
+ */
+public class IteratorMincClassCastBugIT extends AccumuloClusterHarness {
+
+  @Override
+  public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+    // this bug only shows up when not using native maps
+    cfg.setProperty(Property.TSERV_NATIVEMAP_ENABLED, "false");
+    cfg.setNumTservers(1);
+  }
+
+  @Override
+  protected int defaultTimeoutSeconds() {
+    return 60;
+  }
+
+  @Test
+  public void test() throws Exception {
+    try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) {
+
+      String tableName = getUniqueNames(1)[0];
+
+      NewTableConfiguration ntc = new NewTableConfiguration();
+      Map<String,Set<Text>> groups = new HashMap<>();
+      groups.put("g1", Set.of(new Text("~chunk")));
+      groups.put("g2", Set.of(new Text("refs")));
+      ntc.setLocalityGroups(groups);
+
+      IteratorSetting iteratorSetting = new IteratorSetting(20, ChunkCombiner.class);
+      ntc.attachIterator(iteratorSetting, EnumSet.of(IteratorUtil.IteratorScope.minc));
+
+      c.tableOperations().create(tableName, ntc);
+
+      int chunkSize = 64 * 1024;
+      ColumnVisibility visibility = new ColumnVisibility();
+
+      List<URL> files = new ArrayList<>();
+      files.add(getClass().getClassLoader().getResource("testfile1.md"));
+
+      try (BatchWriter bw = c.createBatchWriter(tableName, new BatchWriterConfig())) {
+        FileDataIngest fdi = new FileDataIngest(chunkSize, visibility);
+        for (URL filename : files) {
+          fdi.insertFileData(filename, bw);
+        }
+      }
+
+      c.tableOperations().flush(tableName, null, null, true);
+    }
+  }
+
+  /**
+   * Copied from 2.0 examples. Takes a list of files and archives them into Accumulo keyed on hashes
+   * of the files.
+   */
+  @SuppressFBWarnings(value = "WEAK_MESSAGE_DIGEST_MD5", justification = "For testing only")
+  public static class FileDataIngest {
+    public static final Text CHUNK_CF = new Text("~chunk");
+    public static final Text REFS_CF = new Text("refs");
+    public static final String REFS_ORIG_FILE = "name";
+    public static final String REFS_FILE_EXT = "filext";
+    public static final ByteSequence CHUNK_CF_BS =
+        new ArrayByteSequence(CHUNK_CF.getBytes(), 0, CHUNK_CF.getLength());
+    public static final ByteSequence REFS_CF_BS =
+        new ArrayByteSequence(REFS_CF.getBytes(), 0, REFS_CF.getLength());
+
+    int chunkSize;
+    byte[] chunkSizeBytes;
+    byte[] buf;
+    MessageDigest md5digest;
+    ColumnVisibility cv;
+
+    public FileDataIngest(int chunkSize, ColumnVisibility colvis) {
+      this.chunkSize = chunkSize;
+      chunkSizeBytes = intToBytes(chunkSize);
+      buf = new byte[chunkSize];
+      try {
+        md5digest = MessageDigest.getInstance("MD5");
+      } catch (NoSuchAlgorithmException e) {
+        throw new RuntimeException(e);
+      }
+      cv = colvis;
+    }
+
+    public String insertFileData(URL fileURL, BatchWriter bw)
+        throws MutationsRejectedException, IOException {
+      String filename = fileURL.getFile();
+      if (chunkSize == 0)
+        return "";
+      md5digest.reset();
+      String uid = hexString(md5digest.digest(filename.getBytes()));
+
+      // read through file once, calculating hashes
+      md5digest.reset();
+      InputStream fis = null;
+      int numRead = 0;
+      // try (var reader = new InputStreamReader(accumuloPropsLocation.openStream(), UTF_8)) {
+      try {
+        fis = new FileInputStream(filename);
+        numRead = fis.read(buf);
+        while (numRead >= 0) {
+          if (numRead > 0) {
+            md5digest.update(buf, 0, numRead);
+          }
+          numRead = fis.read(buf);
+        }
+      } finally {
+        if (fis != null) {
+          fis.close();
+        }
+      }

Review comment:
       Our test file is small enough that this can be reduced to two simple lines... one to read the file fully into a String, and another to compute the digest from the String's bytes. This would improve the comprehensibility of the test, since we don't actually need to chunk and buffer the tiny test file.

##########
File path: test/src/main/java/org/apache/accumulo/test/functional/IteratorMincClassCastBugIT.java
##########
@@ -0,0 +1,516 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.admin.NewTableConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.ArrayByteSequence;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.PartialKey;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.IteratorUtil;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+/**
+ * Tests iterator class hierarchy bug. See https://github.com/apache/accumulo/issues/2341
+ */
+public class IteratorMincClassCastBugIT extends AccumuloClusterHarness {
+
+  @Override
+  public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+    // this bug only shows up when not using native maps
+    cfg.setProperty(Property.TSERV_NATIVEMAP_ENABLED, "false");
+    cfg.setNumTservers(1);
+  }
+
+  @Override
+  protected int defaultTimeoutSeconds() {
+    return 60;
+  }
+
+  @Test
+  public void test() throws Exception {
+    try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) {
+
+      String tableName = getUniqueNames(1)[0];
+
+      NewTableConfiguration ntc = new NewTableConfiguration();
+      Map<String,Set<Text>> groups = new HashMap<>();
+      groups.put("g1", Set.of(new Text("~chunk")));
+      groups.put("g2", Set.of(new Text("refs")));
+      ntc.setLocalityGroups(groups);
+
+      IteratorSetting iteratorSetting = new IteratorSetting(20, ChunkCombiner.class);
+      ntc.attachIterator(iteratorSetting, EnumSet.of(IteratorUtil.IteratorScope.minc));
+
+      c.tableOperations().create(tableName, ntc);
+
+      int chunkSize = 64 * 1024;
+      ColumnVisibility visibility = new ColumnVisibility();
+
+      List<URL> files = new ArrayList<>();
+      files.add(getClass().getClassLoader().getResource("testfile1.md"));
+
+      try (BatchWriter bw = c.createBatchWriter(tableName, new BatchWriterConfig())) {
+        FileDataIngest fdi = new FileDataIngest(chunkSize, visibility);
+        for (URL filename : files) {
+          fdi.insertFileData(filename, bw);
+        }
+      }
+
+      c.tableOperations().flush(tableName, null, null, true);
+    }
+  }
+
+  /**
+   * Copied from 2.0 examples. Takes a list of files and archives them into Accumulo keyed on hashes
+   * of the files.
+   */
+  @SuppressFBWarnings(value = "WEAK_MESSAGE_DIGEST_MD5", justification = "For testing only")
+  public static class FileDataIngest {
+    public static final Text CHUNK_CF = new Text("~chunk");
+    public static final Text REFS_CF = new Text("refs");
+    public static final String REFS_ORIG_FILE = "name";
+    public static final String REFS_FILE_EXT = "filext";
+    public static final ByteSequence CHUNK_CF_BS =
+        new ArrayByteSequence(CHUNK_CF.getBytes(), 0, CHUNK_CF.getLength());
+    public static final ByteSequence REFS_CF_BS =
+        new ArrayByteSequence(REFS_CF.getBytes(), 0, REFS_CF.getLength());
+
+    int chunkSize;
+    byte[] chunkSizeBytes;
+    byte[] buf;
+    MessageDigest md5digest;
+    ColumnVisibility cv;
+
+    public FileDataIngest(int chunkSize, ColumnVisibility colvis) {
+      this.chunkSize = chunkSize;
+      chunkSizeBytes = intToBytes(chunkSize);
+      buf = new byte[chunkSize];
+      try {
+        md5digest = MessageDigest.getInstance("MD5");
+      } catch (NoSuchAlgorithmException e) {
+        throw new RuntimeException(e);
+      }
+      cv = colvis;
+    }
+
+    public String insertFileData(URL fileURL, BatchWriter bw)
+        throws MutationsRejectedException, IOException {
+      String filename = fileURL.getFile();
+      if (chunkSize == 0)
+        return "";
+      md5digest.reset();
+      String uid = hexString(md5digest.digest(filename.getBytes()));
+
+      // read through file once, calculating hashes
+      md5digest.reset();
+      InputStream fis = null;
+      int numRead = 0;
+      // try (var reader = new InputStreamReader(accumuloPropsLocation.openStream(), UTF_8)) {
+      try {
+        fis = new FileInputStream(filename);
+        numRead = fis.read(buf);
+        while (numRead >= 0) {
+          if (numRead > 0) {
+            md5digest.update(buf, 0, numRead);
+          }
+          numRead = fis.read(buf);
+        }
+      } finally {
+        if (fis != null) {
+          fis.close();
+        }
+      }
+
+      String hash = hexString(md5digest.digest());
+      Text row = new Text(hash);
+
+      // write info to accumulo
+      Mutation m = new Mutation(row);
+      m.put(REFS_CF, buildNullSepText(uid, REFS_ORIG_FILE), cv, new Value(filename.getBytes()));
+      String fext = getExt(filename);
+      if (fext != null)
+        m.put(REFS_CF, buildNullSepText(uid, REFS_FILE_EXT), cv, new Value(fext.getBytes()));
+      bw.addMutation(m);
+
+      // read through file again, writing chunks to accumulo
+      int chunkCount = 0;
+      try {
+        fis = new FileInputStream(filename);
+        numRead = fis.read(buf);
+        while (numRead >= 0) {
+          while (numRead < buf.length) {
+            int moreRead = fis.read(buf, numRead, buf.length - numRead);
+            if (moreRead > 0)
+              numRead += moreRead;
+            else if (moreRead < 0)
+              break;
+          }
+          m = new Mutation(row);
+          Text chunkCQ = new Text(chunkSizeBytes);
+          chunkCQ.append(intToBytes(chunkCount), 0, 4);
+          m.put(CHUNK_CF, chunkCQ, cv, new Value(buf, 0, numRead));
+          bw.addMutation(m);
+          if (chunkCount == Integer.MAX_VALUE)
+            throw new RuntimeException(
+                "too many chunks for file " + filename + ", try raising chunk size");
+          chunkCount++;
+          numRead = fis.read(buf);
+        }
+      } finally {
+        if (fis != null) {
+          fis.close();
+        }
+      }
+      m = new Mutation(row);
+      Text chunkCQ = new Text(chunkSizeBytes);
+      chunkCQ.append(intToBytes(chunkCount), 0, 4);
+      m.put(new Text(CHUNK_CF), chunkCQ, cv, new Value(new byte[0]));
+      bw.addMutation(m);
+      return hash;
+    }
+
+    public static byte[] intToBytes(int l) {
+      byte[] b = new byte[4];
+      b[0] = (byte) (l >>> 24);
+      b[1] = (byte) (l >>> 16);
+      b[2] = (byte) (l >>> 8);
+      b[3] = (byte) (l >>> 0);
+      return b;
+    }
+
+    private static String getExt(String filename) {
+      if (filename.indexOf(".") == -1)
+        return null;
+      return filename.substring(filename.lastIndexOf(".") + 1);
+    }
+
+    public String hexString(byte[] bytes) {
+      StringBuilder sb = new StringBuilder();
+      for (byte b : bytes) {
+        sb.append(String.format("%02x", b));
+      }
+      return sb.toString();
+    }
+  }
+
+  /**
+   * This iterator dedupes chunks and sets their visibilities to the combined visibility of the refs
+   * columns. For example, it would combine
+   *
+   * <pre>
+   *    row1 refs uid1\0a A&amp;B V0
+   *    row1 refs uid2\0b C&amp;D V0
+   *    row1 ~chunk 0 A&amp;B V1
+   *    row1 ~chunk 0 C&amp;D V1
+   *    row1 ~chunk 0 E&amp;F V1
+   *    row1 ~chunk 0 G&amp;H V1
+   * </pre>
+   *
+   * into the following
+   *
+   * <pre>
+   *    row1 refs uid1\0a A&amp;B V0
+   *    row1 refs uid2\0b C&amp;D V0
+   *    row1 ~chunk 0 (A&amp;B)|(C&amp;D) V1
+   * </pre>
+   *
+   * {@link VisibilityCombiner} is used to combie the visibilities.
+   */
+  public static class ChunkCombiner implements SortedKeyValueIterator<Key,Value> {
+    private static final Logger log = LoggerFactory.getLogger(ChunkCombiner.class);
+
+    private SortedKeyValueIterator<Key,Value> source;
+    private SortedKeyValueIterator<Key,Value> refsSource;
+    private static final Collection<ByteSequence> refsColf =
+        Collections.singleton(FileDataIngest.REFS_CF_BS);
+    private Map<Text,byte[]> lastRowVC = Collections.emptyMap();
+
+    private Key topKey = null;
+    private Value topValue = null;
+
+    public ChunkCombiner() {}
+
+    @Override
+    public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options,
+        IteratorEnvironment env) throws IOException {
+      this.source = source;
+      this.refsSource = source.deepCopy(env);
+      log.info("DUDE call init. deepCopy on {}", source.getClass());
+    }
+
+    @Override
+    public boolean hasTop() {
+      return topKey != null;
+    }
+
+    @Override
+    public void next() throws IOException {
+      findTop();
+    }
+
+    @Override
+    public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive)
+        throws IOException {
+      source.seek(range, columnFamilies, inclusive);
+      findTop();
+    }
+
+    private void findTop() throws IOException {
+      do {
+        topKey = null;
+        topValue = null;
+      } while (source.hasTop() && _findTop() == null);
+    }
+
+    private byte[] _findTop() throws IOException {
+      long maxTS;
+
+      topKey = new Key(source.getTopKey());
+      topValue = new Value(source.getTopValue());
+      source.next();
+
+      if (!topKey.getColumnFamilyData().equals(FileDataIngest.CHUNK_CF_BS))
+        return topKey.getColumnVisibility().getBytes();
+
+      maxTS = topKey.getTimestamp();
+
+      while (source.hasTop() && source.getTopKey().equals(topKey, PartialKey.ROW_COLFAM_COLQUAL)) {
+        if (source.getTopKey().getTimestamp() > maxTS)
+          maxTS = source.getTopKey().getTimestamp();
+
+        if (!topValue.equals(source.getTopValue()))
+          throw new RuntimeException("values not equals " + topKey + " " + source.getTopKey()
+              + " : " + diffInfo(topValue, source.getTopValue()));
+
+        source.next();
+      }
+
+      byte[] vis = getVisFromRefs();
+      if (vis != null) {
+        topKey = new Key(topKey.getRowData().toArray(), topKey.getColumnFamilyData().toArray(),
+            topKey.getColumnQualifierData().toArray(), vis, maxTS);
+      }
+      return vis;
+    }
+
+    private byte[] getVisFromRefs() throws IOException {
+      Text row = topKey.getRow();
+      if (lastRowVC.containsKey(row))
+        return lastRowVC.get(row);
+      Range range = new Range(row);
+      refsSource.seek(range, refsColf, true);
+      VisibilityCombiner vc = null;
+      while (refsSource.hasTop()) {
+        if (vc == null)
+          vc = new VisibilityCombiner();
+        vc.add(refsSource.getTopKey().getColumnVisibilityData());
+        refsSource.next();
+      }
+      if (vc == null) {
+        lastRowVC = Collections.singletonMap(row, null);
+        return null;
+      }
+      lastRowVC = Collections.singletonMap(row, vc.get());
+      return vc.get();
+    }
+
+    private String diffInfo(Value v1, Value v2) {
+      if (v1.getSize() != v2.getSize()) {
+        return "val len not equal " + v1.getSize() + "!=" + v2.getSize();
+      }
+
+      byte[] vb1 = v1.get();
+      byte[] vb2 = v2.get();
+
+      for (int i = 0; i < vb1.length; i++) {
+        if (vb1[i] != vb2[i]) {
+          return String.format("first diff at offset %,d 0x%02x != 0x%02x", i, 0xff & vb1[i],
+              0xff & vb2[i]);
+        }
+      }
+
+      return null;
+    }
+
+    @Override
+    public Key getTopKey() {
+      return topKey;
+    }
+
+    @Override
+    public Value getTopValue() {
+      return topValue;
+    }
+
+    @Override
+    public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
+      ChunkCombiner cc = new ChunkCombiner();
+      try {
+        cc.init(source.deepCopy(env), null, env);
+      } catch (IOException e) {
+        throw new IllegalArgumentException(e);
+      }
+      return cc;
+    }
+  }
+
+  /**
+   * Copied from 2.0 Examples. A utility for merging visibilities into the form
+   * {@code (VIS1)|(VIS2)|...|(VISN)}. Used by the ChunkCombiner.
+   */
+  public static class VisibilityCombiner {
+
+    private TreeSet<String> visibilities = new TreeSet<>();
+
+    void add(ByteSequence cv) {
+      if (cv.length() == 0)
+        return;
+
+      int depth = 0;
+      int offset = 0;
+
+      for (int i = 0; i < cv.length(); i++) {
+        switch (cv.byteAt(i)) {
+          case '(':
+            depth++;
+            break;
+          case ')':
+            depth--;
+            if (depth < 0)
+              throw new IllegalArgumentException("Invalid vis " + cv);
+            break;
+          case '|':
+            if (depth == 0) {
+              insert(cv.subSequence(offset, i));
+              offset = i + 1;
+            }
+
+            break;
+        }
+      }
+
+      insert(cv.subSequence(offset, cv.length()));
+
+      if (depth != 0)
+        throw new IllegalArgumentException("Invalid vis " + cv);
+
+    }
+
+    private void insert(ByteSequence cv) {
+      for (int i = 0; i < cv.length(); i++) {
+
+      }
+
+      String cvs = cv.toString();
+
+      if (cvs.charAt(0) != '(')
+        cvs = "(" + cvs + ")";
+      else {
+        int depth = 0;
+        int depthZeroCloses = 0;
+        for (int i = 0; i < cv.length(); i++) {
+          switch (cv.byteAt(i)) {
+            case '(':
+              depth++;
+              break;
+            case ')':
+              depth--;
+              if (depth == 0)
+                depthZeroCloses++;
+              break;
+          }
+        }
+
+        if (depthZeroCloses > 1)
+          cvs = "(" + cvs + ")";
+      }
+
+      visibilities.add(cvs);
+    }
+
+    byte[] get() {
+      StringBuilder sb = new StringBuilder();
+      String sep = "";
+      for (String cvs : visibilities) {
+        sb.append(sep);
+        sep = "|";
+        sb.append(cvs);
+      }
+
+      return sb.toString().getBytes();
+    }
+  }
+
+  public static final byte[] nullbyte = new byte[] {0};
+
+  /**
+   * Join some number of strings using a null byte separator into a text object.
+   *
+   * @param s
+   *          strings
+   * @return a text object containing the strings separated by null bytes
+   */

Review comment:
       This could probably just be a one-line inline comment. We're not publishing javadocs for ITs. This method can probably also be private... as could some others added in this file for testing.

##########
File path: core/src/main/java/org/apache/accumulo/core/iterators/SortedMapIterator.java
##########
@@ -95,6 +111,9 @@ public void next() throws IOException {
   public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive)
       throws IOException {
 
+    if (interruptFlag != null && interruptFlag.get())

Review comment:
       `interruptFlag` should never be null. We can require non-null anywhere we accept it.

##########
File path: test/src/main/java/org/apache/accumulo/test/functional/IteratorMincClassCastBugIT.java
##########
@@ -0,0 +1,516 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.admin.NewTableConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.ArrayByteSequence;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.PartialKey;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.IteratorUtil;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+/**
+ * Tests iterator class hierarchy bug. See https://github.com/apache/accumulo/issues/2341
+ */
+public class IteratorMincClassCastBugIT extends AccumuloClusterHarness {
+
+  @Override
+  public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+    // this bug only shows up when not using native maps
+    cfg.setProperty(Property.TSERV_NATIVEMAP_ENABLED, "false");
+    cfg.setNumTservers(1);
+  }
+
+  @Override
+  protected int defaultTimeoutSeconds() {
+    return 60;
+  }
+
+  @Test
+  public void test() throws Exception {
+    try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) {
+
+      String tableName = getUniqueNames(1)[0];
+
+      NewTableConfiguration ntc = new NewTableConfiguration();
+      Map<String,Set<Text>> groups = new HashMap<>();
+      groups.put("g1", Set.of(new Text("~chunk")));
+      groups.put("g2", Set.of(new Text("refs")));
+      ntc.setLocalityGroups(groups);
+
+      IteratorSetting iteratorSetting = new IteratorSetting(20, ChunkCombiner.class);
+      ntc.attachIterator(iteratorSetting, EnumSet.of(IteratorUtil.IteratorScope.minc));
+
+      c.tableOperations().create(tableName, ntc);
+
+      int chunkSize = 64 * 1024;
+      ColumnVisibility visibility = new ColumnVisibility();
+
+      List<URL> files = new ArrayList<>();
+      files.add(getClass().getClassLoader().getResource("testfile1.md"));
+
+      try (BatchWriter bw = c.createBatchWriter(tableName, new BatchWriterConfig())) {
+        FileDataIngest fdi = new FileDataIngest(chunkSize, visibility);
+        for (URL filename : files) {
+          fdi.insertFileData(filename, bw);
+        }
+      }
+
+      c.tableOperations().flush(tableName, null, null, true);
+    }
+  }
+
+  /**
+   * Copied from 2.0 examples. Takes a list of files and archives them into Accumulo keyed on hashes
+   * of the files.
+   */
+  @SuppressFBWarnings(value = "WEAK_MESSAGE_DIGEST_MD5", justification = "For testing only")
+  public static class FileDataIngest {
+    public static final Text CHUNK_CF = new Text("~chunk");
+    public static final Text REFS_CF = new Text("refs");
+    public static final String REFS_ORIG_FILE = "name";
+    public static final String REFS_FILE_EXT = "filext";
+    public static final ByteSequence CHUNK_CF_BS =
+        new ArrayByteSequence(CHUNK_CF.getBytes(), 0, CHUNK_CF.getLength());
+    public static final ByteSequence REFS_CF_BS =
+        new ArrayByteSequence(REFS_CF.getBytes(), 0, REFS_CF.getLength());
+
+    int chunkSize;
+    byte[] chunkSizeBytes;
+    byte[] buf;
+    MessageDigest md5digest;
+    ColumnVisibility cv;
+
+    public FileDataIngest(int chunkSize, ColumnVisibility colvis) {
+      this.chunkSize = chunkSize;
+      chunkSizeBytes = intToBytes(chunkSize);
+      buf = new byte[chunkSize];
+      try {
+        md5digest = MessageDigest.getInstance("MD5");
+      } catch (NoSuchAlgorithmException e) {
+        throw new RuntimeException(e);
+      }
+      cv = colvis;
+    }
+
+    public String insertFileData(URL fileURL, BatchWriter bw)
+        throws MutationsRejectedException, IOException {
+      String filename = fileURL.getFile();
+      if (chunkSize == 0)
+        return "";
+      md5digest.reset();
+      String uid = hexString(md5digest.digest(filename.getBytes()));
+
+      // read through file once, calculating hashes
+      md5digest.reset();
+      InputStream fis = null;
+      int numRead = 0;
+      // try (var reader = new InputStreamReader(accumuloPropsLocation.openStream(), UTF_8)) {
+      try {
+        fis = new FileInputStream(filename);
+        numRead = fis.read(buf);
+        while (numRead >= 0) {
+          if (numRead > 0) {
+            md5digest.update(buf, 0, numRead);
+          }
+          numRead = fis.read(buf);
+        }
+      } finally {
+        if (fis != null) {
+          fis.close();
+        }
+      }
+
+      String hash = hexString(md5digest.digest());
+      Text row = new Text(hash);
+
+      // write info to accumulo
+      Mutation m = new Mutation(row);
+      m.put(REFS_CF, buildNullSepText(uid, REFS_ORIG_FILE), cv, new Value(filename.getBytes()));
+      String fext = getExt(filename);
+      if (fext != null)
+        m.put(REFS_CF, buildNullSepText(uid, REFS_FILE_EXT), cv, new Value(fext.getBytes()));
+      bw.addMutation(m);
+
+      // read through file again, writing chunks to accumulo
+      int chunkCount = 0;
+      try {
+        fis = new FileInputStream(filename);
+        numRead = fis.read(buf);
+        while (numRead >= 0) {
+          while (numRead < buf.length) {
+            int moreRead = fis.read(buf, numRead, buf.length - numRead);
+            if (moreRead > 0)
+              numRead += moreRead;
+            else if (moreRead < 0)
+              break;
+          }
+          m = new Mutation(row);
+          Text chunkCQ = new Text(chunkSizeBytes);
+          chunkCQ.append(intToBytes(chunkCount), 0, 4);
+          m.put(CHUNK_CF, chunkCQ, cv, new Value(buf, 0, numRead));
+          bw.addMutation(m);
+          if (chunkCount == Integer.MAX_VALUE)
+            throw new RuntimeException(
+                "too many chunks for file " + filename + ", try raising chunk size");
+          chunkCount++;
+          numRead = fis.read(buf);
+        }
+      } finally {
+        if (fis != null) {
+          fis.close();
+        }
+      }
+      m = new Mutation(row);
+      Text chunkCQ = new Text(chunkSizeBytes);
+      chunkCQ.append(intToBytes(chunkCount), 0, 4);
+      m.put(new Text(CHUNK_CF), chunkCQ, cv, new Value(new byte[0]));
+      bw.addMutation(m);
+      return hash;
+    }
+
+    public static byte[] intToBytes(int l) {
+      byte[] b = new byte[4];
+      b[0] = (byte) (l >>> 24);
+      b[1] = (byte) (l >>> 16);
+      b[2] = (byte) (l >>> 8);
+      b[3] = (byte) (l >>> 0);
+      return b;
+    }
+
+    private static String getExt(String filename) {
+      if (filename.indexOf(".") == -1)
+        return null;
+      return filename.substring(filename.lastIndexOf(".") + 1);
+    }
+
+    public String hexString(byte[] bytes) {
+      StringBuilder sb = new StringBuilder();
+      for (byte b : bytes) {
+        sb.append(String.format("%02x", b));
+      }
+      return sb.toString();
+    }
+  }
+
+  /**
+   * This iterator dedupes chunks and sets their visibilities to the combined visibility of the refs
+   * columns. For example, it would combine
+   *
+   * <pre>
+   *    row1 refs uid1\0a A&amp;B V0
+   *    row1 refs uid2\0b C&amp;D V0
+   *    row1 ~chunk 0 A&amp;B V1
+   *    row1 ~chunk 0 C&amp;D V1
+   *    row1 ~chunk 0 E&amp;F V1
+   *    row1 ~chunk 0 G&amp;H V1
+   * </pre>
+   *
+   * into the following
+   *
+   * <pre>
+   *    row1 refs uid1\0a A&amp;B V0
+   *    row1 refs uid2\0b C&amp;D V0
+   *    row1 ~chunk 0 (A&amp;B)|(C&amp;D) V1
+   * </pre>
+   *
+   * {@link VisibilityCombiner} is used to combie the visibilities.
+   */
+  public static class ChunkCombiner implements SortedKeyValueIterator<Key,Value> {
+    private static final Logger log = LoggerFactory.getLogger(ChunkCombiner.class);
+
+    private SortedKeyValueIterator<Key,Value> source;
+    private SortedKeyValueIterator<Key,Value> refsSource;
+    private static final Collection<ByteSequence> refsColf =
+        Collections.singleton(FileDataIngest.REFS_CF_BS);
+    private Map<Text,byte[]> lastRowVC = Collections.emptyMap();
+
+    private Key topKey = null;
+    private Value topValue = null;
+
+    public ChunkCombiner() {}
+
+    @Override
+    public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options,
+        IteratorEnvironment env) throws IOException {
+      this.source = source;
+      this.refsSource = source.deepCopy(env);
+      log.info("DUDE call init. deepCopy on {}", source.getClass());
+    }
+
+    @Override
+    public boolean hasTop() {
+      return topKey != null;
+    }
+
+    @Override
+    public void next() throws IOException {
+      findTop();
+    }
+
+    @Override
+    public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive)
+        throws IOException {
+      source.seek(range, columnFamilies, inclusive);
+      findTop();
+    }
+
+    private void findTop() throws IOException {
+      do {
+        topKey = null;
+        topValue = null;
+      } while (source.hasTop() && _findTop() == null);
+    }
+
+    private byte[] _findTop() throws IOException {
+      long maxTS;
+
+      topKey = new Key(source.getTopKey());
+      topValue = new Value(source.getTopValue());
+      source.next();
+
+      if (!topKey.getColumnFamilyData().equals(FileDataIngest.CHUNK_CF_BS))
+        return topKey.getColumnVisibility().getBytes();
+
+      maxTS = topKey.getTimestamp();
+
+      while (source.hasTop() && source.getTopKey().equals(topKey, PartialKey.ROW_COLFAM_COLQUAL)) {
+        if (source.getTopKey().getTimestamp() > maxTS)
+          maxTS = source.getTopKey().getTimestamp();
+
+        if (!topValue.equals(source.getTopValue()))
+          throw new RuntimeException("values not equals " + topKey + " " + source.getTopKey()
+              + " : " + diffInfo(topValue, source.getTopValue()));
+
+        source.next();
+      }
+
+      byte[] vis = getVisFromRefs();
+      if (vis != null) {
+        topKey = new Key(topKey.getRowData().toArray(), topKey.getColumnFamilyData().toArray(),
+            topKey.getColumnQualifierData().toArray(), vis, maxTS);
+      }
+      return vis;
+    }
+
+    private byte[] getVisFromRefs() throws IOException {
+      Text row = topKey.getRow();
+      if (lastRowVC.containsKey(row))
+        return lastRowVC.get(row);
+      Range range = new Range(row);
+      refsSource.seek(range, refsColf, true);
+      VisibilityCombiner vc = null;
+      while (refsSource.hasTop()) {
+        if (vc == null)
+          vc = new VisibilityCombiner();
+        vc.add(refsSource.getTopKey().getColumnVisibilityData());
+        refsSource.next();
+      }
+      if (vc == null) {
+        lastRowVC = Collections.singletonMap(row, null);
+        return null;
+      }
+      lastRowVC = Collections.singletonMap(row, vc.get());
+      return vc.get();
+    }
+
+    private String diffInfo(Value v1, Value v2) {
+      if (v1.getSize() != v2.getSize()) {
+        return "val len not equal " + v1.getSize() + "!=" + v2.getSize();
+      }
+
+      byte[] vb1 = v1.get();
+      byte[] vb2 = v2.get();
+
+      for (int i = 0; i < vb1.length; i++) {
+        if (vb1[i] != vb2[i]) {
+          return String.format("first diff at offset %,d 0x%02x != 0x%02x", i, 0xff & vb1[i],
+              0xff & vb2[i]);
+        }
+      }
+
+      return null;
+    }
+
+    @Override
+    public Key getTopKey() {
+      return topKey;
+    }
+
+    @Override
+    public Value getTopValue() {
+      return topValue;
+    }
+
+    @Override
+    public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
+      ChunkCombiner cc = new ChunkCombiner();
+      try {
+        cc.init(source.deepCopy(env), null, env);
+      } catch (IOException e) {
+        throw new IllegalArgumentException(e);
+      }
+      return cc;
+    }
+  }
+
+  /**
+   * Copied from 2.0 Examples. A utility for merging visibilities into the form
+   * {@code (VIS1)|(VIS2)|...|(VISN)}. Used by the ChunkCombiner.
+   */
+  public static class VisibilityCombiner {
+
+    private TreeSet<String> visibilities = new TreeSet<>();
+
+    void add(ByteSequence cv) {
+      if (cv.length() == 0)
+        return;
+
+      int depth = 0;
+      int offset = 0;
+
+      for (int i = 0; i < cv.length(); i++) {
+        switch (cv.byteAt(i)) {
+          case '(':
+            depth++;
+            break;
+          case ')':
+            depth--;
+            if (depth < 0)
+              throw new IllegalArgumentException("Invalid vis " + cv);
+            break;
+          case '|':
+            if (depth == 0) {
+              insert(cv.subSequence(offset, i));
+              offset = i + 1;
+            }
+
+            break;
+        }
+      }
+
+      insert(cv.subSequence(offset, cv.length()));
+
+      if (depth != 0)
+        throw new IllegalArgumentException("Invalid vis " + cv);
+
+    }
+
+    private void insert(ByteSequence cv) {
+      for (int i = 0; i < cv.length(); i++) {
+
+      }

Review comment:
       What is this loop doing?

##########
File path: core/src/main/java/org/apache/accumulo/core/iterators/ColumnFamilySkippingIterator.java
##########
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.accumulo.core.iteratorsImpl.system;
+package org.apache.accumulo.core.iterators;

Review comment:
       Moving the interface back to public API seems like the obvious fix, but are we expecting users to actually use this interface? I haven't followed all the details, but my understanding was that this was a feature intended solely for internal use with some of our own system iterators. But, maybe I'm wrong? Is this something that is useful to user iterators and safe for them to use?

##########
File path: test/src/main/java/org/apache/accumulo/test/functional/IteratorMincClassCastBugIT.java
##########
@@ -0,0 +1,516 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.admin.NewTableConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.ArrayByteSequence;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.PartialKey;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.IteratorUtil;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+/**
+ * Tests iterator class hierarchy bug. See https://github.com/apache/accumulo/issues/2341
+ */
+public class IteratorMincClassCastBugIT extends AccumuloClusterHarness {
+
+  @Override
+  public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+    // this bug only shows up when not using native maps
+    cfg.setProperty(Property.TSERV_NATIVEMAP_ENABLED, "false");
+    cfg.setNumTservers(1);
+  }
+
+  @Override
+  protected int defaultTimeoutSeconds() {
+    return 60;
+  }
+
+  @Test
+  public void test() throws Exception {
+    try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) {
+
+      String tableName = getUniqueNames(1)[0];
+
+      NewTableConfiguration ntc = new NewTableConfiguration();
+      Map<String,Set<Text>> groups = new HashMap<>();
+      groups.put("g1", Set.of(new Text("~chunk")));
+      groups.put("g2", Set.of(new Text("refs")));
+      ntc.setLocalityGroups(groups);
+
+      IteratorSetting iteratorSetting = new IteratorSetting(20, ChunkCombiner.class);
+      ntc.attachIterator(iteratorSetting, EnumSet.of(IteratorUtil.IteratorScope.minc));
+
+      c.tableOperations().create(tableName, ntc);
+
+      int chunkSize = 64 * 1024;
+      ColumnVisibility visibility = new ColumnVisibility();
+
+      List<URL> files = new ArrayList<>();
+      files.add(getClass().getClassLoader().getResource("testfile1.md"));
+
+      try (BatchWriter bw = c.createBatchWriter(tableName, new BatchWriterConfig())) {
+        FileDataIngest fdi = new FileDataIngest(chunkSize, visibility);
+        for (URL filename : files) {
+          fdi.insertFileData(filename, bw);
+        }
+      }
+
+      c.tableOperations().flush(tableName, null, null, true);
+    }
+  }
+
+  /**
+   * Copied from 2.0 examples. Takes a list of files and archives them into Accumulo keyed on hashes
+   * of the files.
+   */
+  @SuppressFBWarnings(value = "WEAK_MESSAGE_DIGEST_MD5", justification = "For testing only")
+  public static class FileDataIngest {
+    public static final Text CHUNK_CF = new Text("~chunk");
+    public static final Text REFS_CF = new Text("refs");
+    public static final String REFS_ORIG_FILE = "name";
+    public static final String REFS_FILE_EXT = "filext";
+    public static final ByteSequence CHUNK_CF_BS =
+        new ArrayByteSequence(CHUNK_CF.getBytes(), 0, CHUNK_CF.getLength());
+    public static final ByteSequence REFS_CF_BS =
+        new ArrayByteSequence(REFS_CF.getBytes(), 0, REFS_CF.getLength());
+
+    int chunkSize;
+    byte[] chunkSizeBytes;
+    byte[] buf;
+    MessageDigest md5digest;
+    ColumnVisibility cv;
+
+    public FileDataIngest(int chunkSize, ColumnVisibility colvis) {
+      this.chunkSize = chunkSize;
+      chunkSizeBytes = intToBytes(chunkSize);
+      buf = new byte[chunkSize];
+      try {
+        md5digest = MessageDigest.getInstance("MD5");
+      } catch (NoSuchAlgorithmException e) {
+        throw new RuntimeException(e);
+      }
+      cv = colvis;
+    }
+
+    public String insertFileData(URL fileURL, BatchWriter bw)
+        throws MutationsRejectedException, IOException {
+      String filename = fileURL.getFile();
+      if (chunkSize == 0)
+        return "";
+      md5digest.reset();
+      String uid = hexString(md5digest.digest(filename.getBytes()));
+
+      // read through file once, calculating hashes
+      md5digest.reset();
+      InputStream fis = null;
+      int numRead = 0;
+      // try (var reader = new InputStreamReader(accumuloPropsLocation.openStream(), UTF_8)) {
+      try {
+        fis = new FileInputStream(filename);
+        numRead = fis.read(buf);
+        while (numRead >= 0) {
+          if (numRead > 0) {
+            md5digest.update(buf, 0, numRead);
+          }
+          numRead = fis.read(buf);
+        }
+      } finally {
+        if (fis != null) {
+          fis.close();
+        }
+      }
+
+      String hash = hexString(md5digest.digest());
+      Text row = new Text(hash);
+
+      // write info to accumulo
+      Mutation m = new Mutation(row);
+      m.put(REFS_CF, buildNullSepText(uid, REFS_ORIG_FILE), cv, new Value(filename.getBytes()));
+      String fext = getExt(filename);
+      if (fext != null)
+        m.put(REFS_CF, buildNullSepText(uid, REFS_FILE_EXT), cv, new Value(fext.getBytes()));
+      bw.addMutation(m);
+
+      // read through file again, writing chunks to accumulo
+      int chunkCount = 0;
+      try {
+        fis = new FileInputStream(filename);
+        numRead = fis.read(buf);
+        while (numRead >= 0) {
+          while (numRead < buf.length) {
+            int moreRead = fis.read(buf, numRead, buf.length - numRead);
+            if (moreRead > 0)
+              numRead += moreRead;
+            else if (moreRead < 0)
+              break;
+          }
+          m = new Mutation(row);
+          Text chunkCQ = new Text(chunkSizeBytes);
+          chunkCQ.append(intToBytes(chunkCount), 0, 4);
+          m.put(CHUNK_CF, chunkCQ, cv, new Value(buf, 0, numRead));
+          bw.addMutation(m);
+          if (chunkCount == Integer.MAX_VALUE)
+            throw new RuntimeException(
+                "too many chunks for file " + filename + ", try raising chunk size");
+          chunkCount++;
+          numRead = fis.read(buf);
+        }
+      } finally {
+        if (fis != null) {
+          fis.close();
+        }
+      }
+      m = new Mutation(row);
+      Text chunkCQ = new Text(chunkSizeBytes);
+      chunkCQ.append(intToBytes(chunkCount), 0, 4);
+      m.put(new Text(CHUNK_CF), chunkCQ, cv, new Value(new byte[0]));
+      bw.addMutation(m);
+      return hash;
+    }
+
+    public static byte[] intToBytes(int l) {
+      byte[] b = new byte[4];
+      b[0] = (byte) (l >>> 24);
+      b[1] = (byte) (l >>> 16);
+      b[2] = (byte) (l >>> 8);
+      b[3] = (byte) (l >>> 0);
+      return b;
+    }
+
+    private static String getExt(String filename) {
+      if (filename.indexOf(".") == -1)
+        return null;
+      return filename.substring(filename.lastIndexOf(".") + 1);
+    }
+
+    public String hexString(byte[] bytes) {
+      StringBuilder sb = new StringBuilder();
+      for (byte b : bytes) {
+        sb.append(String.format("%02x", b));
+      }
+      return sb.toString();
+    }
+  }
+
+  /**
+   * This iterator dedupes chunks and sets their visibilities to the combined visibility of the refs
+   * columns. For example, it would combine
+   *
+   * <pre>
+   *    row1 refs uid1\0a A&amp;B V0
+   *    row1 refs uid2\0b C&amp;D V0
+   *    row1 ~chunk 0 A&amp;B V1
+   *    row1 ~chunk 0 C&amp;D V1
+   *    row1 ~chunk 0 E&amp;F V1
+   *    row1 ~chunk 0 G&amp;H V1
+   * </pre>
+   *
+   * into the following
+   *
+   * <pre>
+   *    row1 refs uid1\0a A&amp;B V0
+   *    row1 refs uid2\0b C&amp;D V0
+   *    row1 ~chunk 0 (A&amp;B)|(C&amp;D) V1
+   * </pre>
+   *
+   * {@link VisibilityCombiner} is used to combie the visibilities.
+   */
+  public static class ChunkCombiner implements SortedKeyValueIterator<Key,Value> {
+    private static final Logger log = LoggerFactory.getLogger(ChunkCombiner.class);
+
+    private SortedKeyValueIterator<Key,Value> source;
+    private SortedKeyValueIterator<Key,Value> refsSource;
+    private static final Collection<ByteSequence> refsColf =
+        Collections.singleton(FileDataIngest.REFS_CF_BS);
+    private Map<Text,byte[]> lastRowVC = Collections.emptyMap();
+
+    private Key topKey = null;
+    private Value topValue = null;
+
+    public ChunkCombiner() {}
+
+    @Override
+    public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options,
+        IteratorEnvironment env) throws IOException {
+      this.source = source;
+      this.refsSource = source.deepCopy(env);
+      log.info("DUDE call init. deepCopy on {}", source.getClass());
+    }
+
+    @Override
+    public boolean hasTop() {
+      return topKey != null;
+    }
+
+    @Override
+    public void next() throws IOException {
+      findTop();
+    }
+
+    @Override
+    public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive)
+        throws IOException {
+      source.seek(range, columnFamilies, inclusive);
+      findTop();
+    }
+
+    private void findTop() throws IOException {
+      do {
+        topKey = null;
+        topValue = null;
+      } while (source.hasTop() && _findTop() == null);
+    }
+
+    private byte[] _findTop() throws IOException {
+      long maxTS;
+
+      topKey = new Key(source.getTopKey());
+      topValue = new Value(source.getTopValue());
+      source.next();
+
+      if (!topKey.getColumnFamilyData().equals(FileDataIngest.CHUNK_CF_BS))
+        return topKey.getColumnVisibility().getBytes();
+
+      maxTS = topKey.getTimestamp();
+
+      while (source.hasTop() && source.getTopKey().equals(topKey, PartialKey.ROW_COLFAM_COLQUAL)) {
+        if (source.getTopKey().getTimestamp() > maxTS)
+          maxTS = source.getTopKey().getTimestamp();
+
+        if (!topValue.equals(source.getTopValue()))
+          throw new RuntimeException("values not equals " + topKey + " " + source.getTopKey()
+              + " : " + diffInfo(topValue, source.getTopValue()));
+
+        source.next();
+      }
+
+      byte[] vis = getVisFromRefs();
+      if (vis != null) {
+        topKey = new Key(topKey.getRowData().toArray(), topKey.getColumnFamilyData().toArray(),
+            topKey.getColumnQualifierData().toArray(), vis, maxTS);
+      }
+      return vis;
+    }
+
+    private byte[] getVisFromRefs() throws IOException {
+      Text row = topKey.getRow();
+      if (lastRowVC.containsKey(row))
+        return lastRowVC.get(row);
+      Range range = new Range(row);
+      refsSource.seek(range, refsColf, true);
+      VisibilityCombiner vc = null;
+      while (refsSource.hasTop()) {
+        if (vc == null)
+          vc = new VisibilityCombiner();
+        vc.add(refsSource.getTopKey().getColumnVisibilityData());
+        refsSource.next();
+      }
+      if (vc == null) {
+        lastRowVC = Collections.singletonMap(row, null);
+        return null;
+      }
+      lastRowVC = Collections.singletonMap(row, vc.get());
+      return vc.get();
+    }
+
+    private String diffInfo(Value v1, Value v2) {
+      if (v1.getSize() != v2.getSize()) {
+        return "val len not equal " + v1.getSize() + "!=" + v2.getSize();
+      }
+
+      byte[] vb1 = v1.get();
+      byte[] vb2 = v2.get();
+
+      for (int i = 0; i < vb1.length; i++) {
+        if (vb1[i] != vb2[i]) {
+          return String.format("first diff at offset %,d 0x%02x != 0x%02x", i, 0xff & vb1[i],
+              0xff & vb2[i]);
+        }
+      }
+
+      return null;
+    }
+
+    @Override
+    public Key getTopKey() {
+      return topKey;
+    }
+
+    @Override
+    public Value getTopValue() {
+      return topValue;
+    }
+
+    @Override
+    public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
+      ChunkCombiner cc = new ChunkCombiner();
+      try {
+        cc.init(source.deepCopy(env), null, env);
+      } catch (IOException e) {
+        throw new IllegalArgumentException(e);
+      }
+      return cc;
+    }
+  }
+
+  /**
+   * Copied from 2.0 Examples. A utility for merging visibilities into the form
+   * {@code (VIS1)|(VIS2)|...|(VISN)}. Used by the ChunkCombiner.
+   */
+  public static class VisibilityCombiner {
+
+    private TreeSet<String> visibilities = new TreeSet<>();
+
+    void add(ByteSequence cv) {
+      if (cv.length() == 0)
+        return;
+
+      int depth = 0;
+      int offset = 0;
+
+      for (int i = 0; i < cv.length(); i++) {
+        switch (cv.byteAt(i)) {
+          case '(':
+            depth++;
+            break;
+          case ')':
+            depth--;
+            if (depth < 0)
+              throw new IllegalArgumentException("Invalid vis " + cv);
+            break;
+          case '|':
+            if (depth == 0) {
+              insert(cv.subSequence(offset, i));
+              offset = i + 1;
+            }
+
+            break;
+        }
+      }
+
+      insert(cv.subSequence(offset, cv.length()));
+
+      if (depth != 0)
+        throw new IllegalArgumentException("Invalid vis " + cv);
+
+    }
+
+    private void insert(ByteSequence cv) {
+      for (int i = 0; i < cv.length(); i++) {
+
+      }
+
+      String cvs = cv.toString();
+
+      if (cvs.charAt(0) != '(')
+        cvs = "(" + cvs + ")";
+      else {
+        int depth = 0;
+        int depthZeroCloses = 0;
+        for (int i = 0; i < cv.length(); i++) {
+          switch (cv.byteAt(i)) {
+            case '(':
+              depth++;
+              break;
+            case ')':
+              depth--;
+              if (depth == 0)
+                depthZeroCloses++;
+              break;
+          }
+        }
+
+        if (depthZeroCloses > 1)
+          cvs = "(" + cvs + ")";
+      }
+
+      visibilities.add(cvs);
+    }
+
+    byte[] get() {
+      StringBuilder sb = new StringBuilder();
+      String sep = "";
+      for (String cvs : visibilities) {
+        sb.append(sep);
+        sep = "|";
+        sb.append(cvs);
+      }
+
+      return sb.toString().getBytes();
+    }
+  }
+
+  public static final byte[] nullbyte = new byte[] {0};
+
+  /**
+   * Join some number of strings using a null byte separator into a text object.
+   *
+   * @param s
+   *          strings
+   * @return a text object containing the strings separated by null bytes
+   */
+  public static Text buildNullSepText(String... s) {
+    Text t = new Text(s[0]);
+    for (int i = 1; i < s.length; i++) {
+      t.append(nullbyte, 0, 1);
+      t.append(s[i].getBytes(), 0, s[i].length());

Review comment:
       should be `getBytes(UTF_8)`

##########
File path: test/src/main/java/org/apache/accumulo/test/functional/IteratorMincClassCastBugIT.java
##########
@@ -0,0 +1,516 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.admin.NewTableConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.ArrayByteSequence;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.PartialKey;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.IteratorUtil;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+/**
+ * Tests iterator class hierarchy bug. See https://github.com/apache/accumulo/issues/2341
+ */
+public class IteratorMincClassCastBugIT extends AccumuloClusterHarness {
+
+  @Override
+  public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+    // this bug only shows up when not using native maps
+    cfg.setProperty(Property.TSERV_NATIVEMAP_ENABLED, "false");
+    cfg.setNumTservers(1);
+  }
+
+  @Override
+  protected int defaultTimeoutSeconds() {
+    return 60;
+  }
+
+  @Test
+  public void test() throws Exception {
+    try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) {
+
+      String tableName = getUniqueNames(1)[0];
+
+      NewTableConfiguration ntc = new NewTableConfiguration();
+      Map<String,Set<Text>> groups = new HashMap<>();
+      groups.put("g1", Set.of(new Text("~chunk")));
+      groups.put("g2", Set.of(new Text("refs")));
+      ntc.setLocalityGroups(groups);
+
+      IteratorSetting iteratorSetting = new IteratorSetting(20, ChunkCombiner.class);
+      ntc.attachIterator(iteratorSetting, EnumSet.of(IteratorUtil.IteratorScope.minc));
+
+      c.tableOperations().create(tableName, ntc);
+
+      int chunkSize = 64 * 1024;
+      ColumnVisibility visibility = new ColumnVisibility();
+
+      List<URL> files = new ArrayList<>();
+      files.add(getClass().getClassLoader().getResource("testfile1.md"));
+
+      try (BatchWriter bw = c.createBatchWriter(tableName, new BatchWriterConfig())) {
+        FileDataIngest fdi = new FileDataIngest(chunkSize, visibility);
+        for (URL filename : files) {
+          fdi.insertFileData(filename, bw);
+        }
+      }
+
+      c.tableOperations().flush(tableName, null, null, true);
+    }
+  }
+
+  /**
+   * Copied from 2.0 examples. Takes a list of files and archives them into Accumulo keyed on hashes
+   * of the files.
+   */
+  @SuppressFBWarnings(value = "WEAK_MESSAGE_DIGEST_MD5", justification = "For testing only")
+  public static class FileDataIngest {
+    public static final Text CHUNK_CF = new Text("~chunk");
+    public static final Text REFS_CF = new Text("refs");
+    public static final String REFS_ORIG_FILE = "name";
+    public static final String REFS_FILE_EXT = "filext";
+    public static final ByteSequence CHUNK_CF_BS =
+        new ArrayByteSequence(CHUNK_CF.getBytes(), 0, CHUNK_CF.getLength());
+    public static final ByteSequence REFS_CF_BS =
+        new ArrayByteSequence(REFS_CF.getBytes(), 0, REFS_CF.getLength());
+
+    int chunkSize;
+    byte[] chunkSizeBytes;
+    byte[] buf;
+    MessageDigest md5digest;
+    ColumnVisibility cv;
+
+    public FileDataIngest(int chunkSize, ColumnVisibility colvis) {
+      this.chunkSize = chunkSize;
+      chunkSizeBytes = intToBytes(chunkSize);
+      buf = new byte[chunkSize];
+      try {
+        md5digest = MessageDigest.getInstance("MD5");
+      } catch (NoSuchAlgorithmException e) {
+        throw new RuntimeException(e);
+      }
+      cv = colvis;
+    }
+
+    public String insertFileData(URL fileURL, BatchWriter bw)
+        throws MutationsRejectedException, IOException {
+      String filename = fileURL.getFile();
+      if (chunkSize == 0)
+        return "";
+      md5digest.reset();
+      String uid = hexString(md5digest.digest(filename.getBytes()));
+
+      // read through file once, calculating hashes
+      md5digest.reset();
+      InputStream fis = null;
+      int numRead = 0;
+      // try (var reader = new InputStreamReader(accumuloPropsLocation.openStream(), UTF_8)) {
+      try {
+        fis = new FileInputStream(filename);
+        numRead = fis.read(buf);
+        while (numRead >= 0) {
+          if (numRead > 0) {
+            md5digest.update(buf, 0, numRead);
+          }
+          numRead = fis.read(buf);
+        }
+      } finally {
+        if (fis != null) {
+          fis.close();
+        }
+      }
+
+      String hash = hexString(md5digest.digest());
+      Text row = new Text(hash);
+
+      // write info to accumulo
+      Mutation m = new Mutation(row);
+      m.put(REFS_CF, buildNullSepText(uid, REFS_ORIG_FILE), cv, new Value(filename.getBytes()));
+      String fext = getExt(filename);
+      if (fext != null)
+        m.put(REFS_CF, buildNullSepText(uid, REFS_FILE_EXT), cv, new Value(fext.getBytes()));
+      bw.addMutation(m);
+
+      // read through file again, writing chunks to accumulo
+      int chunkCount = 0;
+      try {
+        fis = new FileInputStream(filename);
+        numRead = fis.read(buf);
+        while (numRead >= 0) {
+          while (numRead < buf.length) {
+            int moreRead = fis.read(buf, numRead, buf.length - numRead);
+            if (moreRead > 0)
+              numRead += moreRead;
+            else if (moreRead < 0)
+              break;
+          }
+          m = new Mutation(row);
+          Text chunkCQ = new Text(chunkSizeBytes);
+          chunkCQ.append(intToBytes(chunkCount), 0, 4);
+          m.put(CHUNK_CF, chunkCQ, cv, new Value(buf, 0, numRead));
+          bw.addMutation(m);
+          if (chunkCount == Integer.MAX_VALUE)
+            throw new RuntimeException(
+                "too many chunks for file " + filename + ", try raising chunk size");
+          chunkCount++;
+          numRead = fis.read(buf);
+        }
+      } finally {
+        if (fis != null) {
+          fis.close();
+        }
+      }
+      m = new Mutation(row);
+      Text chunkCQ = new Text(chunkSizeBytes);
+      chunkCQ.append(intToBytes(chunkCount), 0, 4);
+      m.put(new Text(CHUNK_CF), chunkCQ, cv, new Value(new byte[0]));
+      bw.addMutation(m);
+      return hash;
+    }
+
+    public static byte[] intToBytes(int l) {
+      byte[] b = new byte[4];
+      b[0] = (byte) (l >>> 24);
+      b[1] = (byte) (l >>> 16);
+      b[2] = (byte) (l >>> 8);
+      b[3] = (byte) (l >>> 0);
+      return b;
+    }
+
+    private static String getExt(String filename) {
+      if (filename.indexOf(".") == -1)
+        return null;
+      return filename.substring(filename.lastIndexOf(".") + 1);
+    }
+
+    public String hexString(byte[] bytes) {
+      StringBuilder sb = new StringBuilder();
+      for (byte b : bytes) {
+        sb.append(String.format("%02x", b));
+      }
+      return sb.toString();
+    }
+  }
+
+  /**
+   * This iterator dedupes chunks and sets their visibilities to the combined visibility of the refs
+   * columns. For example, it would combine
+   *
+   * <pre>
+   *    row1 refs uid1\0a A&amp;B V0
+   *    row1 refs uid2\0b C&amp;D V0
+   *    row1 ~chunk 0 A&amp;B V1
+   *    row1 ~chunk 0 C&amp;D V1
+   *    row1 ~chunk 0 E&amp;F V1
+   *    row1 ~chunk 0 G&amp;H V1
+   * </pre>
+   *
+   * into the following
+   *
+   * <pre>
+   *    row1 refs uid1\0a A&amp;B V0
+   *    row1 refs uid2\0b C&amp;D V0
+   *    row1 ~chunk 0 (A&amp;B)|(C&amp;D) V1
+   * </pre>
+   *
+   * {@link VisibilityCombiner} is used to combie the visibilities.
+   */
+  public static class ChunkCombiner implements SortedKeyValueIterator<Key,Value> {
+    private static final Logger log = LoggerFactory.getLogger(ChunkCombiner.class);
+
+    private SortedKeyValueIterator<Key,Value> source;
+    private SortedKeyValueIterator<Key,Value> refsSource;
+    private static final Collection<ByteSequence> refsColf =
+        Collections.singleton(FileDataIngest.REFS_CF_BS);
+    private Map<Text,byte[]> lastRowVC = Collections.emptyMap();
+
+    private Key topKey = null;
+    private Value topValue = null;
+
+    public ChunkCombiner() {}
+
+    @Override
+    public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options,
+        IteratorEnvironment env) throws IOException {
+      this.source = source;
+      this.refsSource = source.deepCopy(env);
+      log.info("DUDE call init. deepCopy on {}", source.getClass());
+    }
+
+    @Override
+    public boolean hasTop() {
+      return topKey != null;
+    }
+
+    @Override
+    public void next() throws IOException {
+      findTop();
+    }
+
+    @Override
+    public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive)
+        throws IOException {
+      source.seek(range, columnFamilies, inclusive);
+      findTop();
+    }
+
+    private void findTop() throws IOException {
+      do {
+        topKey = null;
+        topValue = null;
+      } while (source.hasTop() && _findTop() == null);
+    }
+
+    private byte[] _findTop() throws IOException {
+      long maxTS;
+
+      topKey = new Key(source.getTopKey());
+      topValue = new Value(source.getTopValue());
+      source.next();
+
+      if (!topKey.getColumnFamilyData().equals(FileDataIngest.CHUNK_CF_BS))
+        return topKey.getColumnVisibility().getBytes();
+
+      maxTS = topKey.getTimestamp();
+
+      while (source.hasTop() && source.getTopKey().equals(topKey, PartialKey.ROW_COLFAM_COLQUAL)) {
+        if (source.getTopKey().getTimestamp() > maxTS)
+          maxTS = source.getTopKey().getTimestamp();
+
+        if (!topValue.equals(source.getTopValue()))
+          throw new RuntimeException("values not equals " + topKey + " " + source.getTopKey()
+              + " : " + diffInfo(topValue, source.getTopValue()));
+
+        source.next();
+      }
+
+      byte[] vis = getVisFromRefs();
+      if (vis != null) {
+        topKey = new Key(topKey.getRowData().toArray(), topKey.getColumnFamilyData().toArray(),
+            topKey.getColumnQualifierData().toArray(), vis, maxTS);
+      }
+      return vis;
+    }
+
+    private byte[] getVisFromRefs() throws IOException {
+      Text row = topKey.getRow();
+      if (lastRowVC.containsKey(row))
+        return lastRowVC.get(row);
+      Range range = new Range(row);
+      refsSource.seek(range, refsColf, true);
+      VisibilityCombiner vc = null;
+      while (refsSource.hasTop()) {
+        if (vc == null)
+          vc = new VisibilityCombiner();
+        vc.add(refsSource.getTopKey().getColumnVisibilityData());
+        refsSource.next();
+      }
+      if (vc == null) {
+        lastRowVC = Collections.singletonMap(row, null);
+        return null;
+      }
+      lastRowVC = Collections.singletonMap(row, vc.get());
+      return vc.get();
+    }
+
+    private String diffInfo(Value v1, Value v2) {
+      if (v1.getSize() != v2.getSize()) {
+        return "val len not equal " + v1.getSize() + "!=" + v2.getSize();
+      }
+
+      byte[] vb1 = v1.get();
+      byte[] vb2 = v2.get();
+
+      for (int i = 0; i < vb1.length; i++) {
+        if (vb1[i] != vb2[i]) {
+          return String.format("first diff at offset %,d 0x%02x != 0x%02x", i, 0xff & vb1[i],
+              0xff & vb2[i]);
+        }
+      }
+
+      return null;
+    }
+
+    @Override
+    public Key getTopKey() {
+      return topKey;
+    }
+
+    @Override
+    public Value getTopValue() {
+      return topValue;
+    }
+
+    @Override
+    public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
+      ChunkCombiner cc = new ChunkCombiner();
+      try {
+        cc.init(source.deepCopy(env), null, env);
+      } catch (IOException e) {
+        throw new IllegalArgumentException(e);
+      }
+      return cc;
+    }
+  }
+
+  /**
+   * Copied from 2.0 Examples. A utility for merging visibilities into the form
+   * {@code (VIS1)|(VIS2)|...|(VISN)}. Used by the ChunkCombiner.
+   */
+  public static class VisibilityCombiner {
+
+    private TreeSet<String> visibilities = new TreeSet<>();
+
+    void add(ByteSequence cv) {
+      if (cv.length() == 0)
+        return;
+
+      int depth = 0;
+      int offset = 0;
+
+      for (int i = 0; i < cv.length(); i++) {
+        switch (cv.byteAt(i)) {
+          case '(':
+            depth++;
+            break;
+          case ')':
+            depth--;
+            if (depth < 0)
+              throw new IllegalArgumentException("Invalid vis " + cv);
+            break;
+          case '|':
+            if (depth == 0) {
+              insert(cv.subSequence(offset, i));
+              offset = i + 1;
+            }
+
+            break;
+        }
+      }
+
+      insert(cv.subSequence(offset, cv.length()));
+
+      if (depth != 0)
+        throw new IllegalArgumentException("Invalid vis " + cv);
+
+    }
+
+    private void insert(ByteSequence cv) {
+      for (int i = 0; i < cv.length(); i++) {
+
+      }
+
+      String cvs = cv.toString();
+
+      if (cvs.charAt(0) != '(')
+        cvs = "(" + cvs + ")";
+      else {
+        int depth = 0;
+        int depthZeroCloses = 0;
+        for (int i = 0; i < cv.length(); i++) {
+          switch (cv.byteAt(i)) {
+            case '(':
+              depth++;
+              break;
+            case ')':
+              depth--;
+              if (depth == 0)
+                depthZeroCloses++;
+              break;
+          }
+        }
+
+        if (depthZeroCloses > 1)
+          cvs = "(" + cvs + ")";
+      }
+
+      visibilities.add(cvs);
+    }
+
+    byte[] get() {
+      StringBuilder sb = new StringBuilder();
+      String sep = "";
+      for (String cvs : visibilities) {
+        sb.append(sep);
+        sep = "|";
+        sb.append(cvs);
+      }
+
+      return sb.toString().getBytes();

Review comment:
       Should be `getBytes(UTF_8)`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] milleruntime commented on a change in pull request #2347: New IteratorMincClassCastBugIT and fixes to #1411

Posted by GitBox <gi...@apache.org>.
milleruntime commented on a change in pull request #2347:
URL: https://github.com/apache/accumulo/pull/2347#discussion_r744142174



##########
File path: core/src/main/java/org/apache/accumulo/core/iterators/ColumnFamilySkippingIterator.java
##########
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.accumulo.core.iteratorsImpl.system;
+package org.apache.accumulo.core.iterators;

Review comment:
       Moved this iterator back to iteratorsImpl package in 1a4bb21607324ef6a9d2a0eeb458c0cdb3c59681




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] milleruntime merged pull request #2347: New IteratorMincClassCastBugIT and fixes to #1411

Posted by GitBox <gi...@apache.org>.
milleruntime merged pull request #2347:
URL: https://github.com/apache/accumulo/pull/2347


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] milleruntime commented on a change in pull request #2347: New IteratorMincClassCastBugIT and fixes to #1411

Posted by GitBox <gi...@apache.org>.
milleruntime commented on a change in pull request #2347:
URL: https://github.com/apache/accumulo/pull/2347#discussion_r743759663



##########
File path: core/src/main/java/org/apache/accumulo/core/iterators/ColumnFamilySkippingIterator.java
##########
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.accumulo.core.iteratorsImpl.system;
+package org.apache.accumulo.core.iterators;

Review comment:
       Also, my original intention in #1411 was since SortedMapIterator was used so much in our Tests that it would be useful to users as well. But I could be wrong in that thinking. It could be unsafe to expose it to users so a better option may be to move it in with the other system iterators. I can't remember if I attempted this or how difficult this would be though.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] milleruntime commented on a change in pull request #2347: New IteratorMincClassCastBugIT and fixes to #1411

Posted by GitBox <gi...@apache.org>.
milleruntime commented on a change in pull request #2347:
URL: https://github.com/apache/accumulo/pull/2347#discussion_r743731017



##########
File path: core/src/main/java/org/apache/accumulo/core/iterators/SortedMapIterator.java
##########
@@ -36,28 +37,40 @@
  *
  * Note that this class is intended as an in-memory replacement for RFile$Reader, so its behavior
  * reflects the same assumptions; namely, that this iterator is not responsible for respecting the
- * columnFamilies passed into seek().
+ * columnFamilies passed into seek(). If you want a Map-backed Iterator that returns only sought
+ * CFs, construct a new ColumnFamilySkippingIterator(new SortedMapIterator(map)).
+ *
+ * @see org.apache.accumulo.core.iterators.ColumnFamilySkippingIterator
  */
-public class SortedMapIterator implements SortedKeyValueIterator<Key,Value> {
+public class SortedMapIterator implements InterruptibleIterator {
   private Iterator<Entry<Key,Value>> iter;
   private Entry<Key,Value> entry;
 
   private SortedMap<Key,Value> map;
   private Range range;
 
+  private AtomicBoolean interruptFlag;

Review comment:
       We should be able to make this final with an initial value of `false`. I haven't really touched the iterators since most of them have been around since forever but I think this would be a good fix. FYI I had a hell of a time tracking down where we actually do the interrupt. As a reference, it seems like the only place we actually interrupt iterators is in `ScanDataSource`:
   https://github.com/apache/accumulo/blob/8a636a3ba91f5dae1d8b09b095178889a7d79c1d/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java#L256-L258
   When a tablet is closed:
   https://github.com/apache/accumulo/blob/b5ca15806659efbbd6f5d158be8648cc2cd13593/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java#L1288
   
   We have a ton of calls to pass the flag around between iterators, readers and RFile code but that seems to be the only place where the actual interrupt takes place.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] ctubbsii commented on a change in pull request #2347: New IteratorMincClassCastBugIT and fixes to #1411

Posted by GitBox <gi...@apache.org>.
ctubbsii commented on a change in pull request #2347:
URL: https://github.com/apache/accumulo/pull/2347#discussion_r745063876



##########
File path: core/src/main/java/org/apache/accumulo/core/iteratorsImpl/system/SortedMapIterator.java
##########
@@ -121,4 +143,9 @@ public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> op
       IteratorEnvironment env) throws IOException {
     throw new UnsupportedOperationException();
   }
+
+  @Override
+  public void setInterruptFlag(AtomicBoolean flag) {
+    this.interruptFlag = flag;
+  }

Review comment:
       My suggestion would be to use BooleanSupplier in the InterruptibleIterator interface, and call `setInterruptFlag(interruptFlag::get)` (where `interruptFlag` in the caller is a final of type `AtomicBoolean`). That would remove any confusion over implied atomicity guarantees inside the InterruptibleIterator interface, but wouldn't change any behavior. But, obviously, that would be a follow-on issue. It need not be done as part of this PR.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] milleruntime commented on a change in pull request #2347: New IteratorMincClassCastBugIT and fixes to #1411

Posted by GitBox <gi...@apache.org>.
milleruntime commented on a change in pull request #2347:
URL: https://github.com/apache/accumulo/pull/2347#discussion_r743731017



##########
File path: core/src/main/java/org/apache/accumulo/core/iterators/SortedMapIterator.java
##########
@@ -36,28 +37,40 @@
  *
  * Note that this class is intended as an in-memory replacement for RFile$Reader, so its behavior
  * reflects the same assumptions; namely, that this iterator is not responsible for respecting the
- * columnFamilies passed into seek().
+ * columnFamilies passed into seek(). If you want a Map-backed Iterator that returns only sought
+ * CFs, construct a new ColumnFamilySkippingIterator(new SortedMapIterator(map)).
+ *
+ * @see org.apache.accumulo.core.iterators.ColumnFamilySkippingIterator
  */
-public class SortedMapIterator implements SortedKeyValueIterator<Key,Value> {
+public class SortedMapIterator implements InterruptibleIterator {
   private Iterator<Entry<Key,Value>> iter;
   private Entry<Key,Value> entry;
 
   private SortedMap<Key,Value> map;
   private Range range;
 
+  private AtomicBoolean interruptFlag;

Review comment:
       We should be able to make this final with an initial value of `false`. I haven't really touched the iterators since most of them have been around since forever but I think this would be a good fix. FYI I had a hell of a time tracking down where we actually do the interrupt. As a reference, it seems like the only place we actually interrupt iterators is in `ScanDataSource`:
   https://github.com/apache/accumulo/blob/8a636a3ba91f5dae1d8b09b095178889a7d79c1d/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java#L256-L258
   When a tablet is closed:
   https://github.com/apache/accumulo/blob/b5ca15806659efbbd6f5d158be8648cc2cd13593/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java#L1288
   
   We have a ton of calls to pass the flag around between iterators, readers and RFile code but that seems to be the only place where the actual interrupt takes place.

##########
File path: core/src/main/java/org/apache/accumulo/core/iterators/ColumnFamilySkippingIterator.java
##########
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.accumulo.core.iteratorsImpl.system;
+package org.apache.accumulo.core.iterators;

Review comment:
       That is what I had originally thought but after investigating this bug it seems like `SortedMapIterator` and this one could be cast to an InterruptibleIterator in the `LocalityGroupIterator`
   https://github.com/apache/accumulo/blob/8d33bc4d489880f15950189d20287262ce46e68e/core/src/main/java/org/apache/accumulo/core/iteratorsImpl/system/LocalityGroupIterator.java#L49
   
   I think if a user extends one of them and configures it to run in certain situations it could be created through deepCopy(). But it also seems dangerous either way.

##########
File path: core/src/main/java/org/apache/accumulo/core/iterators/ColumnFamilySkippingIterator.java
##########
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.accumulo.core.iteratorsImpl.system;
+package org.apache.accumulo.core.iterators;

Review comment:
       Also, my original intention in #1411 was since SortedMapIterator was used so much in our Tests that it would be useful to users as well. But I could be wrong in that thinking. It could be unsafe to expose it to users so a better option may be to move it in with the other system iterators. I can't remember if I attempted this or how difficult this would be though.

##########
File path: core/src/main/java/org/apache/accumulo/core/iterators/SortedMapIterator.java
##########
@@ -36,28 +37,40 @@
  *
  * Note that this class is intended as an in-memory replacement for RFile$Reader, so its behavior
  * reflects the same assumptions; namely, that this iterator is not responsible for respecting the
- * columnFamilies passed into seek().
+ * columnFamilies passed into seek(). If you want a Map-backed Iterator that returns only sought
+ * CFs, construct a new ColumnFamilySkippingIterator(new SortedMapIterator(map)).
+ *
+ * @see org.apache.accumulo.core.iterators.ColumnFamilySkippingIterator
  */
-public class SortedMapIterator implements SortedKeyValueIterator<Key,Value> {
+public class SortedMapIterator implements InterruptibleIterator {
   private Iterator<Entry<Key,Value>> iter;
   private Entry<Key,Value> entry;
 
   private SortedMap<Key,Value> map;
   private Range range;
 
+  private AtomicBoolean interruptFlag;

Review comment:
       I thought about changing the `AtomicBoolean` to be final but now I am wondering if it was written this way with performance in mind. We do a lot of passing around of the flag across the iterator stack but only ever set it in one place.  If we were to make it final, then I think that would be a lot of extra memory accesses to the atomic value vs just passing around a reference.
   
   I thought of doing something like this in the setter, but like I said, I think it would be less efficient:
   <pre>
   public void setInterruptFlag(AtomicBoolean flag) {
       boolean newFlag = flag.get();
       boolean previousFlag = this.interruptFlag.getAndSet(newFlag);
       if (previousFlag == newFlag && newFlag)
         log.warn("Interrupt called on iterator that was previously interrupted.");
     }
   </pre>




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] milleruntime commented on a change in pull request #2347: New IteratorMincClassCastBugIT and fixes to #1411

Posted by GitBox <gi...@apache.org>.
milleruntime commented on a change in pull request #2347:
URL: https://github.com/apache/accumulo/pull/2347#discussion_r744141838



##########
File path: test/src/main/java/org/apache/accumulo/test/functional/IteratorMincClassCastBugIT.java
##########
@@ -0,0 +1,516 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.admin.NewTableConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.ArrayByteSequence;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.PartialKey;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.IteratorUtil;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+/**
+ * Tests iterator class hierarchy bug. See https://github.com/apache/accumulo/issues/2341
+ */
+public class IteratorMincClassCastBugIT extends AccumuloClusterHarness {
+
+  @Override
+  public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+    // this bug only shows up when not using native maps
+    cfg.setProperty(Property.TSERV_NATIVEMAP_ENABLED, "false");
+    cfg.setNumTservers(1);
+  }
+
+  @Override
+  protected int defaultTimeoutSeconds() {
+    return 60;
+  }
+
+  @Test
+  public void test() throws Exception {
+    try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) {
+
+      String tableName = getUniqueNames(1)[0];
+
+      NewTableConfiguration ntc = new NewTableConfiguration();
+      Map<String,Set<Text>> groups = new HashMap<>();
+      groups.put("g1", Set.of(new Text("~chunk")));
+      groups.put("g2", Set.of(new Text("refs")));
+      ntc.setLocalityGroups(groups);
+
+      IteratorSetting iteratorSetting = new IteratorSetting(20, ChunkCombiner.class);
+      ntc.attachIterator(iteratorSetting, EnumSet.of(IteratorUtil.IteratorScope.minc));
+
+      c.tableOperations().create(tableName, ntc);
+
+      int chunkSize = 64 * 1024;
+      ColumnVisibility visibility = new ColumnVisibility();
+
+      List<URL> files = new ArrayList<>();
+      files.add(getClass().getClassLoader().getResource("testfile1.md"));
+
+      try (BatchWriter bw = c.createBatchWriter(tableName, new BatchWriterConfig())) {
+        FileDataIngest fdi = new FileDataIngest(chunkSize, visibility);
+        for (URL filename : files) {
+          fdi.insertFileData(filename, bw);
+        }
+      }
+
+      c.tableOperations().flush(tableName, null, null, true);
+    }
+  }
+
+  /**
+   * Copied from 2.0 examples. Takes a list of files and archives them into Accumulo keyed on hashes
+   * of the files.
+   */
+  @SuppressFBWarnings(value = "WEAK_MESSAGE_DIGEST_MD5", justification = "For testing only")
+  public static class FileDataIngest {
+    public static final Text CHUNK_CF = new Text("~chunk");
+    public static final Text REFS_CF = new Text("refs");
+    public static final String REFS_ORIG_FILE = "name";
+    public static final String REFS_FILE_EXT = "filext";
+    public static final ByteSequence CHUNK_CF_BS =
+        new ArrayByteSequence(CHUNK_CF.getBytes(), 0, CHUNK_CF.getLength());
+    public static final ByteSequence REFS_CF_BS =
+        new ArrayByteSequence(REFS_CF.getBytes(), 0, REFS_CF.getLength());
+
+    int chunkSize;
+    byte[] chunkSizeBytes;
+    byte[] buf;
+    MessageDigest md5digest;
+    ColumnVisibility cv;
+
+    public FileDataIngest(int chunkSize, ColumnVisibility colvis) {
+      this.chunkSize = chunkSize;
+      chunkSizeBytes = intToBytes(chunkSize);
+      buf = new byte[chunkSize];
+      try {
+        md5digest = MessageDigest.getInstance("MD5");
+      } catch (NoSuchAlgorithmException e) {
+        throw new RuntimeException(e);
+      }
+      cv = colvis;
+    }
+
+    public String insertFileData(URL fileURL, BatchWriter bw)
+        throws MutationsRejectedException, IOException {
+      String filename = fileURL.getFile();
+      if (chunkSize == 0)
+        return "";
+      md5digest.reset();
+      String uid = hexString(md5digest.digest(filename.getBytes()));
+
+      // read through file once, calculating hashes
+      md5digest.reset();
+      InputStream fis = null;
+      int numRead = 0;
+      // try (var reader = new InputStreamReader(accumuloPropsLocation.openStream(), UTF_8)) {
+      try {
+        fis = new FileInputStream(filename);
+        numRead = fis.read(buf);
+        while (numRead >= 0) {
+          if (numRead > 0) {
+            md5digest.update(buf, 0, numRead);
+          }
+          numRead = fis.read(buf);
+        }
+      } finally {
+        if (fis != null) {
+          fis.close();
+        }
+      }

Review comment:
       I rewrote the IT so it doesn't need to ingest a file now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org