You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by GitBox <gi...@apache.org> on 2020/12/13 15:18:02 UTC

[GitHub] [drill] cgivre commented on a change in pull request #2125: DRILL-7525: Convert SequenceFiles to EVF

cgivre commented on a change in pull request #2125:
URL: https://github.com/apache/drill/pull/2125#discussion_r541939346



##########
File path: exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileBatchReader.java
##########
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.sequencefile;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.dfs.easy.EasySubScan;
+import org.apache.drill.exec.util.ImpersonationUtil;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileAsBinaryInputFormat;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SequenceFileBatchReader implements ManagedReader<FileSchemaNegotiator> {
+
+  private static final Logger logger = LoggerFactory.getLogger(SequenceFileBatchReader.class);
+
+  private final SequenceFileFormatConfig config;
+  private final EasySubScan scan;
+  private FileSplit split;
+  private String queryUserName;
+  private String opUserName;
+  private final String keySchema = "binary_key";
+  private final String valueSchema = "binary_value";

Review comment:
       Please use `ALL_CAPS` for constant names. 

##########
File path: exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileBatchReader.java
##########
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.sequencefile;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.dfs.easy.EasySubScan;
+import org.apache.drill.exec.util.ImpersonationUtil;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileAsBinaryInputFormat;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SequenceFileBatchReader implements ManagedReader<FileSchemaNegotiator> {
+
+  private static final Logger logger = LoggerFactory.getLogger(SequenceFileBatchReader.class);
+
+  private final SequenceFileFormatConfig config;
+  private final EasySubScan scan;
+  private FileSplit split;
+  private String queryUserName;
+  private String opUserName;
+  private final String keySchema = "binary_key";
+  private final String valueSchema = "binary_value";
+  private final BytesWritable key = new BytesWritable();
+  private final BytesWritable value = new BytesWritable();
+  private ResultSetLoader setLoader;
+  private RowSetLoader loader;
+  private ScalarWriter keyWriter;
+  private ScalarWriter valueWriter;
+  private RecordReader<BytesWritable, BytesWritable> reader;
+  private CustomErrorContext errorContext;
+  private Stopwatch watch;
+

Review comment:
       I'd add a variable for the limit pushdown. 
   So... here add
   ```java
   private final int maxRecords;
   ```
   
   Then in the constructor, add:
   ```java
   maxRecords = scan.getMaxRecords();
   ```

##########
File path: exec/java-exec/src/test/java/org/apache/drill/exec/store/sequencefile/TestSequenceFileReader.java
##########
@@ -17,32 +17,59 @@
  */
 package org.apache.drill.exec.store.sequencefile;
 
-import java.io.DataOutputStream;
 import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.nio.file.Paths;
 
-import org.junit.Test;
-import org.apache.drill.test.BaseTestQuery;
+import org.apache.drill.categories.RowSetTests;
+import org.apache.drill.exec.physical.rowSet.RowSet;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterTest;
+import org.apache.drill.test.QueryRowSetIterator;
 import org.apache.hadoop.io.BytesWritable;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
 
-public class TestSequenceFileReader extends BaseTestQuery {
+@Category(RowSetTests.class)
+public class TestSequenceFileReader extends ClusterTest {
 
-  public static String byteWritableString(String input) throws Exception {
-    final ByteArrayOutputStream bout = new ByteArrayOutputStream();
-    DataOutputStream out = new DataOutputStream(bout);
-    final BytesWritable writable = new BytesWritable(input.getBytes("UTF-8"));
-    writable.write(out);
-    return new String(bout.toByteArray());
+  @BeforeClass
+  public static void setup() throws Exception {
+    ClusterTest.startCluster(ClusterFixture.builder(dirTestWatcher));
+    dirTestWatcher.copyResourceToRoot(Paths.get("sequencefiles/"));
   }
 
   @Test
   public void testSequenceFileReader() throws Exception {
     testBuilder()
-      .sqlQuery("select convert_from(t.binary_key, 'UTF8') as k, convert_from(t.binary_value, 'UTF8') as v " +
-        "from cp.`sequencefiles/simple.seq` t")
+      .sqlQuery("select convert_from(t.binary_key, 'UTF8') as k, convert_from(t.binary_value, 'UTF8') as v "
+        + "from cp.`sequencefiles/simple.seq` t")
       .ordered()
       .baselineColumns("k", "v")
       .baselineValues(byteWritableString("key0"), byteWritableString("value0"))
       .baselineValues(byteWritableString("key1"), byteWritableString("value1"))
-      .build().run();
+      .build()
+      .run();
+  }
+
+  @Test
+  public void testOutput() {
+    String sql = "select convert_from(t.binary_key, 'UTF8'), convert_from(t.binary_value, 'UTF8')"
+        + "from cp.`sequencefiles/simple.seq` t";
+    QueryRowSetIterator iterator = queryBuilder().sql(sql).rowSetIterator();
+    for (RowSet rowset : iterator) {
+      rowset.print();
+      rowset.clear();
+    }
+  }
+
+  private static String byteWritableString(String input) throws Exception {
+    final ByteArrayOutputStream bout = new ByteArrayOutputStream();
+    DataOutputStream out = new DataOutputStream(bout);
+    final BytesWritable writable = new BytesWritable(input.getBytes("UTF-8"));
+    writable.write(out);
+    return new String(bout.toByteArray());
   }
+

Review comment:
       I'd recommend some additional work on the unit tests.  First, please remove any code in the unit tests that produces output to STDOUT.  (IE any print statements) 
   
   Secondly, take a look at the test below:  I'd recommend refactoring all the unit tests for this in the pattern below. This pattern will test that the schema is being generated correctly, and that the outputs are properly mapped using EVF.
   
   I'd also add unit tests for:
   * Star Query
   * Explicit Field Query
   * Limit Pushdown (Plan only)
   * SerDe 
   * Compressed File (If applicable)
   
   You can pretty much cut/paste from the file below for all those tests. 
   
   
   https://github.com/apache/drill/blob/f79587ed14767463129f8905d2e36e55563655f2/contrib/format-spss/src/test/java/org/apache/drill/exec/store/spss/TestSpssReader.java#L110-L131

##########
File path: exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileBatchReader.java
##########
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.sequencefile;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.dfs.easy.EasySubScan;
+import org.apache.drill.exec.util.ImpersonationUtil;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileAsBinaryInputFormat;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SequenceFileBatchReader implements ManagedReader<FileSchemaNegotiator> {
+
+  private static final Logger logger = LoggerFactory.getLogger(SequenceFileBatchReader.class);
+
+  private final SequenceFileFormatConfig config;
+  private final EasySubScan scan;
+  private FileSplit split;
+  private String queryUserName;
+  private String opUserName;
+  private final String keySchema = "binary_key";
+  private final String valueSchema = "binary_value";
+  private final BytesWritable key = new BytesWritable();
+  private final BytesWritable value = new BytesWritable();
+  private ResultSetLoader setLoader;
+  private RowSetLoader loader;
+  private ScalarWriter keyWriter;
+  private ScalarWriter valueWriter;
+  private RecordReader<BytesWritable, BytesWritable> reader;
+  private CustomErrorContext errorContext;
+  private Stopwatch watch;
+
+  public SequenceFileBatchReader(SequenceFileFormatConfig config, EasySubScan scan) {
+    this.config = config;
+    this.scan = scan;
+  }
+
+  private TupleMetadata defineMetadata() {
+    SchemaBuilder builder = new SchemaBuilder();
+    builder.addNullable(keySchema, MinorType.VARBINARY);
+    builder.addNullable(valueSchema, MinorType.VARBINARY);
+    return builder.buildSchema();
+  }
+
+  private void processReader(FileSchemaNegotiator negotiator) throws ExecutionSetupException {
+    final SequenceFileAsBinaryInputFormat inputFormat = new SequenceFileAsBinaryInputFormat();
+    split = negotiator.split();

Review comment:
       After you've defined the `split`, you should also define the `errorContext`.
   ```java
   errorContext = negotiator.parentErrorContext();
   ```
   Once you've done that, you can include the `errorContext` in all exceptions which gives the user a lot more information about the exception. 
   

##########
File path: exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileBatchReader.java
##########
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.sequencefile;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.dfs.easy.EasySubScan;
+import org.apache.drill.exec.util.ImpersonationUtil;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileAsBinaryInputFormat;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SequenceFileBatchReader implements ManagedReader<FileSchemaNegotiator> {
+
+  private static final Logger logger = LoggerFactory.getLogger(SequenceFileBatchReader.class);
+
+  private final SequenceFileFormatConfig config;
+  private final EasySubScan scan;
+  private FileSplit split;
+  private String queryUserName;

Review comment:
       These two vars do not appear to be used.  Please remove if unnecessary.  Specifically `opUserName` and `queryUserName`.

##########
File path: exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileBatchReader.java
##########
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.sequencefile;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.dfs.easy.EasySubScan;
+import org.apache.drill.exec.util.ImpersonationUtil;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileAsBinaryInputFormat;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SequenceFileBatchReader implements ManagedReader<FileSchemaNegotiator> {
+
+  private static final Logger logger = LoggerFactory.getLogger(SequenceFileBatchReader.class);
+
+  private final SequenceFileFormatConfig config;
+  private final EasySubScan scan;
+  private FileSplit split;
+  private String queryUserName;
+  private String opUserName;
+  private final String keySchema = "binary_key";
+  private final String valueSchema = "binary_value";
+  private final BytesWritable key = new BytesWritable();
+  private final BytesWritable value = new BytesWritable();
+  private ResultSetLoader setLoader;
+  private RowSetLoader loader;
+  private ScalarWriter keyWriter;
+  private ScalarWriter valueWriter;
+  private RecordReader<BytesWritable, BytesWritable> reader;
+  private CustomErrorContext errorContext;
+  private Stopwatch watch;
+
+  public SequenceFileBatchReader(SequenceFileFormatConfig config, EasySubScan scan) {
+    this.config = config;
+    this.scan = scan;
+  }
+
+  private TupleMetadata defineMetadata() {
+    SchemaBuilder builder = new SchemaBuilder();
+    builder.addNullable(keySchema, MinorType.VARBINARY);
+    builder.addNullable(valueSchema, MinorType.VARBINARY);
+    return builder.buildSchema();
+  }
+
+  private void processReader(FileSchemaNegotiator negotiator) throws ExecutionSetupException {
+    final SequenceFileAsBinaryInputFormat inputFormat = new SequenceFileAsBinaryInputFormat();
+    split = negotiator.split();
+    final JobConf jobConf = new JobConf(negotiator.fileSystem().getConf());
+    jobConf.setInputFormat(inputFormat.getClass());
+    reader = getRecordReader(inputFormat, jobConf);
+  }
+
+  private RecordReader<BytesWritable, BytesWritable> getRecordReader(
+    final InputFormat<BytesWritable, BytesWritable> inputFormat, final JobConf jobConf)
+    throws ExecutionSetupException {
+    try {
+      final UserGroupInformation ugi = ImpersonationUtil.createProxyUgi(opUserName, queryUserName);
+      return ugi.doAs(new PrivilegedExceptionAction<RecordReader<BytesWritable, BytesWritable>>() {
+        @Override
+        public RecordReader<BytesWritable, BytesWritable> run() throws Exception {
+          return inputFormat.getRecordReader(split, jobConf, Reporter.NULL);
+        }
+      });
+    } catch (IOException | InterruptedException e) {
+      throw new ExecutionSetupException(

Review comment:
       Please consider throwing a `UserException` here (and elsewhere).

##########
File path: exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileBatchReader.java
##########
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.sequencefile;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.dfs.easy.EasySubScan;
+import org.apache.drill.exec.util.ImpersonationUtil;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileAsBinaryInputFormat;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SequenceFileBatchReader implements ManagedReader<FileSchemaNegotiator> {
+
+  private static final Logger logger = LoggerFactory.getLogger(SequenceFileBatchReader.class);
+
+  private final SequenceFileFormatConfig config;
+  private final EasySubScan scan;
+  private FileSplit split;
+  private String queryUserName;
+  private String opUserName;
+  private final String keySchema = "binary_key";
+  private final String valueSchema = "binary_value";
+  private final BytesWritable key = new BytesWritable();
+  private final BytesWritable value = new BytesWritable();
+  private ResultSetLoader setLoader;
+  private RowSetLoader loader;
+  private ScalarWriter keyWriter;
+  private ScalarWriter valueWriter;
+  private RecordReader<BytesWritable, BytesWritable> reader;
+  private CustomErrorContext errorContext;
+  private Stopwatch watch;
+
+  public SequenceFileBatchReader(SequenceFileFormatConfig config, EasySubScan scan) {
+    this.config = config;
+    this.scan = scan;
+  }
+
+  private TupleMetadata defineMetadata() {
+    SchemaBuilder builder = new SchemaBuilder();
+    builder.addNullable(keySchema, MinorType.VARBINARY);
+    builder.addNullable(valueSchema, MinorType.VARBINARY);
+    return builder.buildSchema();
+  }
+
+  private void processReader(FileSchemaNegotiator negotiator) throws ExecutionSetupException {
+    final SequenceFileAsBinaryInputFormat inputFormat = new SequenceFileAsBinaryInputFormat();
+    split = negotiator.split();
+    final JobConf jobConf = new JobConf(negotiator.fileSystem().getConf());
+    jobConf.setInputFormat(inputFormat.getClass());
+    reader = getRecordReader(inputFormat, jobConf);
+  }
+
+  private RecordReader<BytesWritable, BytesWritable> getRecordReader(
+    final InputFormat<BytesWritable, BytesWritable> inputFormat, final JobConf jobConf)
+    throws ExecutionSetupException {
+    try {
+      final UserGroupInformation ugi = ImpersonationUtil.createProxyUgi(opUserName, queryUserName);
+      return ugi.doAs(new PrivilegedExceptionAction<RecordReader<BytesWritable, BytesWritable>>() {
+        @Override
+        public RecordReader<BytesWritable, BytesWritable> run() throws Exception {
+          return inputFormat.getRecordReader(split, jobConf, Reporter.NULL);
+        }
+      });
+    } catch (IOException | InterruptedException e) {
+      throw new ExecutionSetupException(
+        String.format("Error in creating sequencefile reader for file: %s, start: %d, length: %d",
+          split.getPath(), split.getStart(), split.getLength()), e);
+    }
+  }
+
+  @Override
+  public boolean open(FileSchemaNegotiator negotiator) {
+    negotiator.tableSchema(defineMetadata(), false);

Review comment:
       You should set the `isComplete` parameter here to `true` because you are not adding columns to the schema.  

##########
File path: exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileBatchReader.java
##########
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.sequencefile;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.dfs.easy.EasySubScan;
+import org.apache.drill.exec.util.ImpersonationUtil;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileAsBinaryInputFormat;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SequenceFileBatchReader implements ManagedReader<FileSchemaNegotiator> {
+
+  private static final Logger logger = LoggerFactory.getLogger(SequenceFileBatchReader.class);
+
+  private final SequenceFileFormatConfig config;
+  private final EasySubScan scan;
+  private FileSplit split;
+  private String queryUserName;
+  private String opUserName;
+  private final String keySchema = "binary_key";
+  private final String valueSchema = "binary_value";
+  private final BytesWritable key = new BytesWritable();
+  private final BytesWritable value = new BytesWritable();
+  private ResultSetLoader setLoader;
+  private RowSetLoader loader;
+  private ScalarWriter keyWriter;
+  private ScalarWriter valueWriter;
+  private RecordReader<BytesWritable, BytesWritable> reader;
+  private CustomErrorContext errorContext;
+  private Stopwatch watch;
+
+  public SequenceFileBatchReader(SequenceFileFormatConfig config, EasySubScan scan) {
+    this.config = config;
+    this.scan = scan;
+  }
+
+  private TupleMetadata defineMetadata() {
+    SchemaBuilder builder = new SchemaBuilder();
+    builder.addNullable(keySchema, MinorType.VARBINARY);
+    builder.addNullable(valueSchema, MinorType.VARBINARY);
+    return builder.buildSchema();
+  }
+
+  private void processReader(FileSchemaNegotiator negotiator) throws ExecutionSetupException {
+    final SequenceFileAsBinaryInputFormat inputFormat = new SequenceFileAsBinaryInputFormat();
+    split = negotiator.split();
+    final JobConf jobConf = new JobConf(negotiator.fileSystem().getConf());
+    jobConf.setInputFormat(inputFormat.getClass());
+    reader = getRecordReader(inputFormat, jobConf);
+  }
+
+  private RecordReader<BytesWritable, BytesWritable> getRecordReader(
+    final InputFormat<BytesWritable, BytesWritable> inputFormat, final JobConf jobConf)
+    throws ExecutionSetupException {
+    try {
+      final UserGroupInformation ugi = ImpersonationUtil.createProxyUgi(opUserName, queryUserName);
+      return ugi.doAs(new PrivilegedExceptionAction<RecordReader<BytesWritable, BytesWritable>>() {
+        @Override
+        public RecordReader<BytesWritable, BytesWritable> run() throws Exception {
+          return inputFormat.getRecordReader(split, jobConf, Reporter.NULL);
+        }
+      });
+    } catch (IOException | InterruptedException e) {
+      throw new ExecutionSetupException(
+        String.format("Error in creating sequencefile reader for file: %s, start: %d, length: %d",
+          split.getPath(), split.getStart(), split.getLength()), e);
+    }
+  }
+
+  @Override
+  public boolean open(FileSchemaNegotiator negotiator) {
+    negotiator.tableSchema(defineMetadata(), false);
+    logger.debug("The config is {}, root is {}, columns has {}", config, scan.getSelectionRoot(), scan.getColumns());
+    // open Sequencefile
+    try {
+      processReader(negotiator);
+    } catch (ExecutionSetupException e) {
+      throw UserException
+        .dataReadError(e)
+        .message("Unable to open Sequencefile %s", split.getPath())
+        .addContext(e.getMessage())

Review comment:
       You can remove the call to `e.getMessage()` here as it will be overwritten in the following `addContext()` call.  Also, I'd suggest moving the `e.getMessage()` to the actual `message()` call.  The file information will be provided via the `errorContext` so you don't need to include that in the message.

##########
File path: exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileBatchReader.java
##########
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.sequencefile;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.dfs.easy.EasySubScan;
+import org.apache.drill.exec.util.ImpersonationUtil;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileAsBinaryInputFormat;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SequenceFileBatchReader implements ManagedReader<FileSchemaNegotiator> {
+
+  private static final Logger logger = LoggerFactory.getLogger(SequenceFileBatchReader.class);
+
+  private final SequenceFileFormatConfig config;
+  private final EasySubScan scan;
+  private FileSplit split;
+  private String queryUserName;
+  private String opUserName;
+  private final String keySchema = "binary_key";
+  private final String valueSchema = "binary_value";
+  private final BytesWritable key = new BytesWritable();
+  private final BytesWritable value = new BytesWritable();
+  private ResultSetLoader setLoader;
+  private RowSetLoader loader;
+  private ScalarWriter keyWriter;
+  private ScalarWriter valueWriter;
+  private RecordReader<BytesWritable, BytesWritable> reader;
+  private CustomErrorContext errorContext;
+  private Stopwatch watch;
+
+  public SequenceFileBatchReader(SequenceFileFormatConfig config, EasySubScan scan) {
+    this.config = config;
+    this.scan = scan;
+  }
+
+  private TupleMetadata defineMetadata() {
+    SchemaBuilder builder = new SchemaBuilder();
+    builder.addNullable(keySchema, MinorType.VARBINARY);
+    builder.addNullable(valueSchema, MinorType.VARBINARY);
+    return builder.buildSchema();
+  }
+
+  private void processReader(FileSchemaNegotiator negotiator) throws ExecutionSetupException {
+    final SequenceFileAsBinaryInputFormat inputFormat = new SequenceFileAsBinaryInputFormat();
+    split = negotiator.split();
+    final JobConf jobConf = new JobConf(negotiator.fileSystem().getConf());
+    jobConf.setInputFormat(inputFormat.getClass());
+    reader = getRecordReader(inputFormat, jobConf);
+  }
+
+  private RecordReader<BytesWritable, BytesWritable> getRecordReader(
+    final InputFormat<BytesWritable, BytesWritable> inputFormat, final JobConf jobConf)
+    throws ExecutionSetupException {
+    try {
+      final UserGroupInformation ugi = ImpersonationUtil.createProxyUgi(opUserName, queryUserName);
+      return ugi.doAs(new PrivilegedExceptionAction<RecordReader<BytesWritable, BytesWritable>>() {
+        @Override
+        public RecordReader<BytesWritable, BytesWritable> run() throws Exception {
+          return inputFormat.getRecordReader(split, jobConf, Reporter.NULL);
+        }
+      });
+    } catch (IOException | InterruptedException e) {
+      throw new ExecutionSetupException(
+        String.format("Error in creating sequencefile reader for file: %s, start: %d, length: %d",
+          split.getPath(), split.getStart(), split.getLength()), e);
+    }
+  }
+
+  @Override
+  public boolean open(FileSchemaNegotiator negotiator) {
+    negotiator.tableSchema(defineMetadata(), false);
+    logger.debug("The config is {}, root is {}, columns has {}", config, scan.getSelectionRoot(), scan.getColumns());
+    // open Sequencefile
+    try {
+      processReader(negotiator);
+    } catch (ExecutionSetupException e) {
+      throw UserException
+        .dataReadError(e)
+        .message("Unable to open Sequencefile %s", split.getPath())
+        .addContext(e.getMessage())
+        .addContext(errorContext)
+        .build(logger);
+    }
+    setLoader = negotiator.build();
+    loader = setLoader.writer();
+    keyWriter = loader.scalar(keySchema);
+    valueWriter = loader.scalar(valueSchema);
+    return true;
+  }
+
+  @Override
+  public boolean next() {
+    int recordCount = 0;
+    if (watch == null) {
+      watch = Stopwatch.createStarted();
+    }
+    try {
+      while (!loader.isFull()) {
+        if (reader.next(key, value)) {

Review comment:
       I strongly recommend adding the limit pushdown code.   All you need to do is add the code I referred to earlier and then in this loop add the following:
   ```java
   if (loader.limitReached(maxRecords) ) {
     return false;
   }

##########
File path: exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileBatchReader.java
##########
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.sequencefile;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.dfs.easy.EasySubScan;
+import org.apache.drill.exec.util.ImpersonationUtil;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileAsBinaryInputFormat;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SequenceFileBatchReader implements ManagedReader<FileSchemaNegotiator> {
+
+  private static final Logger logger = LoggerFactory.getLogger(SequenceFileBatchReader.class);
+
+  private final SequenceFileFormatConfig config;
+  private final EasySubScan scan;
+  private FileSplit split;
+  private String queryUserName;
+  private String opUserName;
+  private final String keySchema = "binary_key";
+  private final String valueSchema = "binary_value";
+  private final BytesWritable key = new BytesWritable();
+  private final BytesWritable value = new BytesWritable();
+  private ResultSetLoader setLoader;
+  private RowSetLoader loader;
+  private ScalarWriter keyWriter;
+  private ScalarWriter valueWriter;
+  private RecordReader<BytesWritable, BytesWritable> reader;
+  private CustomErrorContext errorContext;
+  private Stopwatch watch;
+
+  public SequenceFileBatchReader(SequenceFileFormatConfig config, EasySubScan scan) {
+    this.config = config;
+    this.scan = scan;
+  }
+
+  private TupleMetadata defineMetadata() {
+    SchemaBuilder builder = new SchemaBuilder();
+    builder.addNullable(keySchema, MinorType.VARBINARY);
+    builder.addNullable(valueSchema, MinorType.VARBINARY);
+    return builder.buildSchema();
+  }
+
+  private void processReader(FileSchemaNegotiator negotiator) throws ExecutionSetupException {
+    final SequenceFileAsBinaryInputFormat inputFormat = new SequenceFileAsBinaryInputFormat();
+    split = negotiator.split();
+    final JobConf jobConf = new JobConf(negotiator.fileSystem().getConf());
+    jobConf.setInputFormat(inputFormat.getClass());
+    reader = getRecordReader(inputFormat, jobConf);
+  }
+
+  private RecordReader<BytesWritable, BytesWritable> getRecordReader(
+    final InputFormat<BytesWritable, BytesWritable> inputFormat, final JobConf jobConf)
+    throws ExecutionSetupException {
+    try {
+      final UserGroupInformation ugi = ImpersonationUtil.createProxyUgi(opUserName, queryUserName);
+      return ugi.doAs(new PrivilegedExceptionAction<RecordReader<BytesWritable, BytesWritable>>() {
+        @Override
+        public RecordReader<BytesWritable, BytesWritable> run() throws Exception {
+          return inputFormat.getRecordReader(split, jobConf, Reporter.NULL);
+        }
+      });
+    } catch (IOException | InterruptedException e) {
+      throw new ExecutionSetupException(
+        String.format("Error in creating sequencefile reader for file: %s, start: %d, length: %d",
+          split.getPath(), split.getStart(), split.getLength()), e);
+    }
+  }
+
+  @Override
+  public boolean open(FileSchemaNegotiator negotiator) {
+    negotiator.tableSchema(defineMetadata(), false);
+    logger.debug("The config is {}, root is {}, columns has {}", config, scan.getSelectionRoot(), scan.getColumns());
+    // open Sequencefile
+    try {
+      processReader(negotiator);
+    } catch (ExecutionSetupException e) {
+      throw UserException
+        .dataReadError(e)
+        .message("Unable to open Sequencefile %s", split.getPath())
+        .addContext(e.getMessage())
+        .addContext(errorContext)
+        .build(logger);
+    }
+    setLoader = negotiator.build();

Review comment:
       I think `setLoader` can be local. 

##########
File path: exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileBatchReader.java
##########
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.sequencefile;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.dfs.easy.EasySubScan;
+import org.apache.drill.exec.util.ImpersonationUtil;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileAsBinaryInputFormat;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SequenceFileBatchReader implements ManagedReader<FileSchemaNegotiator> {
+
+  private static final Logger logger = LoggerFactory.getLogger(SequenceFileBatchReader.class);
+
+  private final SequenceFileFormatConfig config;
+  private final EasySubScan scan;
+  private FileSplit split;
+  private String queryUserName;
+  private String opUserName;
+  private final String keySchema = "binary_key";
+  private final String valueSchema = "binary_value";
+  private final BytesWritable key = new BytesWritable();
+  private final BytesWritable value = new BytesWritable();
+  private ResultSetLoader setLoader;
+  private RowSetLoader loader;
+  private ScalarWriter keyWriter;
+  private ScalarWriter valueWriter;
+  private RecordReader<BytesWritable, BytesWritable> reader;
+  private CustomErrorContext errorContext;
+  private Stopwatch watch;
+
+  public SequenceFileBatchReader(SequenceFileFormatConfig config, EasySubScan scan) {
+    this.config = config;
+    this.scan = scan;
+  }
+
+  private TupleMetadata defineMetadata() {
+    SchemaBuilder builder = new SchemaBuilder();
+    builder.addNullable(keySchema, MinorType.VARBINARY);
+    builder.addNullable(valueSchema, MinorType.VARBINARY);
+    return builder.buildSchema();
+  }
+
+  private void processReader(FileSchemaNegotiator negotiator) throws ExecutionSetupException {
+    final SequenceFileAsBinaryInputFormat inputFormat = new SequenceFileAsBinaryInputFormat();
+    split = negotiator.split();
+    final JobConf jobConf = new JobConf(negotiator.fileSystem().getConf());
+    jobConf.setInputFormat(inputFormat.getClass());
+    reader = getRecordReader(inputFormat, jobConf);
+  }
+
+  private RecordReader<BytesWritable, BytesWritable> getRecordReader(
+    final InputFormat<BytesWritable, BytesWritable> inputFormat, final JobConf jobConf)
+    throws ExecutionSetupException {
+    try {
+      final UserGroupInformation ugi = ImpersonationUtil.createProxyUgi(opUserName, queryUserName);
+      return ugi.doAs(new PrivilegedExceptionAction<RecordReader<BytesWritable, BytesWritable>>() {
+        @Override
+        public RecordReader<BytesWritable, BytesWritable> run() throws Exception {
+          return inputFormat.getRecordReader(split, jobConf, Reporter.NULL);
+        }
+      });
+    } catch (IOException | InterruptedException e) {
+      throw new ExecutionSetupException(
+        String.format("Error in creating sequencefile reader for file: %s, start: %d, length: %d",
+          split.getPath(), split.getStart(), split.getLength()), e);
+    }
+  }
+
+  @Override
+  public boolean open(FileSchemaNegotiator negotiator) {
+    negotiator.tableSchema(defineMetadata(), false);
+    logger.debug("The config is {}, root is {}, columns has {}", config, scan.getSelectionRoot(), scan.getColumns());
+    // open Sequencefile
+    try {
+      processReader(negotiator);
+    } catch (ExecutionSetupException e) {
+      throw UserException
+        .dataReadError(e)
+        .message("Unable to open Sequencefile %s", split.getPath())
+        .addContext(e.getMessage())
+        .addContext(errorContext)
+        .build(logger);
+    }
+    setLoader = negotiator.build();
+    loader = setLoader.writer();
+    keyWriter = loader.scalar(keySchema);
+    valueWriter = loader.scalar(valueSchema);
+    return true;
+  }
+
+  @Override
+  public boolean next() {
+    int recordCount = 0;
+    if (watch == null) {
+      watch = Stopwatch.createStarted();
+    }
+    try {
+      while (!loader.isFull()) {
+        if (reader.next(key, value)) {
+          loader.start();
+          keyWriter.setBytes(key.getBytes(), key.getLength());
+          valueWriter.setBytes(value.getBytes(), value.getLength());
+          loader.save();
+          ++ recordCount;
+        } else {
+          logger.debug("Read {} records in {} ms", recordCount, watch.elapsed(TimeUnit.MILLISECONDS));
+          return false;
+        }
+      }
+    } catch (IOException e) {
+      close();
+      throw UserException
+              .dataReadError(e)

Review comment:
       Please make sure you add a message as to what went wrong here.    You'll want to make sure every error message has:
   * a clear message in the `message()` call and
   * a call to the `addContext(errorContext)`
   
   Here and elsewhere.
   

##########
File path: exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileBatchReader.java
##########
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.sequencefile;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.dfs.easy.EasySubScan;
+import org.apache.drill.exec.util.ImpersonationUtil;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileAsBinaryInputFormat;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SequenceFileBatchReader implements ManagedReader<FileSchemaNegotiator> {
+
+  private static final Logger logger = LoggerFactory.getLogger(SequenceFileBatchReader.class);
+
+  private final SequenceFileFormatConfig config;
+  private final EasySubScan scan;
+  private FileSplit split;
+  private String queryUserName;
+  private String opUserName;
+  private final String keySchema = "binary_key";
+  private final String valueSchema = "binary_value";
+  private final BytesWritable key = new BytesWritable();
+  private final BytesWritable value = new BytesWritable();
+  private ResultSetLoader setLoader;
+  private RowSetLoader loader;
+  private ScalarWriter keyWriter;
+  private ScalarWriter valueWriter;
+  private RecordReader<BytesWritable, BytesWritable> reader;
+  private CustomErrorContext errorContext;
+  private Stopwatch watch;
+
+  public SequenceFileBatchReader(SequenceFileFormatConfig config, EasySubScan scan) {
+    this.config = config;
+    this.scan = scan;
+  }
+
+  private TupleMetadata defineMetadata() {
+    SchemaBuilder builder = new SchemaBuilder();
+    builder.addNullable(keySchema, MinorType.VARBINARY);
+    builder.addNullable(valueSchema, MinorType.VARBINARY);
+    return builder.buildSchema();
+  }
+
+  private void processReader(FileSchemaNegotiator negotiator) throws ExecutionSetupException {
+    final SequenceFileAsBinaryInputFormat inputFormat = new SequenceFileAsBinaryInputFormat();
+    split = negotiator.split();
+    final JobConf jobConf = new JobConf(negotiator.fileSystem().getConf());
+    jobConf.setInputFormat(inputFormat.getClass());
+    reader = getRecordReader(inputFormat, jobConf);
+  }
+
+  private RecordReader<BytesWritable, BytesWritable> getRecordReader(
+    final InputFormat<BytesWritable, BytesWritable> inputFormat, final JobConf jobConf)
+    throws ExecutionSetupException {
+    try {
+      final UserGroupInformation ugi = ImpersonationUtil.createProxyUgi(opUserName, queryUserName);
+      return ugi.doAs(new PrivilegedExceptionAction<RecordReader<BytesWritable, BytesWritable>>() {
+        @Override
+        public RecordReader<BytesWritable, BytesWritable> run() throws Exception {
+          return inputFormat.getRecordReader(split, jobConf, Reporter.NULL);
+        }
+      });
+    } catch (IOException | InterruptedException e) {
+      throw new ExecutionSetupException(
+        String.format("Error in creating sequencefile reader for file: %s, start: %d, length: %d",
+          split.getPath(), split.getStart(), split.getLength()), e);
+    }
+  }
+
+  @Override
+  public boolean open(FileSchemaNegotiator negotiator) {
+    negotiator.tableSchema(defineMetadata(), false);
+    logger.debug("The config is {}, root is {}, columns has {}", config, scan.getSelectionRoot(), scan.getColumns());
+    // open Sequencefile
+    try {
+      processReader(negotiator);
+    } catch (ExecutionSetupException e) {
+      throw UserException
+        .dataReadError(e)
+        .message("Unable to open Sequencefile %s", split.getPath())
+        .addContext(e.getMessage())
+        .addContext(errorContext)
+        .build(logger);
+    }
+    setLoader = negotiator.build();
+    loader = setLoader.writer();
+    keyWriter = loader.scalar(keySchema);
+    valueWriter = loader.scalar(valueSchema);
+    return true;
+  }
+
+  @Override
+  public boolean next() {
+    int recordCount = 0;
+    if (watch == null) {
+      watch = Stopwatch.createStarted();
+    }
+    try {
+      while (!loader.isFull()) {
+        if (reader.next(key, value)) {
+          loader.start();
+          keyWriter.setBytes(key.getBytes(), key.getLength());
+          valueWriter.setBytes(value.getBytes(), value.getLength());
+          loader.save();
+          ++ recordCount;
+        } else {
+          logger.debug("Read {} records in {} ms", recordCount, watch.elapsed(TimeUnit.MILLISECONDS));
+          return false;
+        }
+      }
+    } catch (IOException e) {
+      close();
+      throw UserException
+              .dataReadError(e)
+              .addContext("File Path", split.getPath())
+              .build(logger);
+    }
+    return true;
+  }
+
+  @Override
+  public void close() {
+    try {
+      if (reader != null) {
+        reader.close();
+        reader = null;
+      }
+    } catch (IOException e) {
+      logger.warn("Exception closing reader: {}", e);
+    }
+  }
+
+  @Override
+  public String toString() {

Review comment:
       The `toString()` function is not really needed for the BatchReader classes. 

##########
File path: exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileBatchReader.java
##########
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.sequencefile;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.dfs.easy.EasySubScan;
+import org.apache.drill.exec.util.ImpersonationUtil;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileAsBinaryInputFormat;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SequenceFileBatchReader implements ManagedReader<FileSchemaNegotiator> {
+
+  private static final Logger logger = LoggerFactory.getLogger(SequenceFileBatchReader.class);
+
+  private final SequenceFileFormatConfig config;
+  private final EasySubScan scan;
+  private FileSplit split;
+  private String queryUserName;
+  private String opUserName;
+  private final String keySchema = "binary_key";
+  private final String valueSchema = "binary_value";
+  private final BytesWritable key = new BytesWritable();
+  private final BytesWritable value = new BytesWritable();
+  private ResultSetLoader setLoader;
+  private RowSetLoader loader;
+  private ScalarWriter keyWriter;
+  private ScalarWriter valueWriter;
+  private RecordReader<BytesWritable, BytesWritable> reader;
+  private CustomErrorContext errorContext;
+  private Stopwatch watch;
+
+  public SequenceFileBatchReader(SequenceFileFormatConfig config, EasySubScan scan) {
+    this.config = config;
+    this.scan = scan;
+  }
+
+  private TupleMetadata defineMetadata() {
+    SchemaBuilder builder = new SchemaBuilder();
+    builder.addNullable(keySchema, MinorType.VARBINARY);
+    builder.addNullable(valueSchema, MinorType.VARBINARY);
+    return builder.buildSchema();
+  }
+
+  private void processReader(FileSchemaNegotiator negotiator) throws ExecutionSetupException {
+    final SequenceFileAsBinaryInputFormat inputFormat = new SequenceFileAsBinaryInputFormat();
+    split = negotiator.split();
+    final JobConf jobConf = new JobConf(negotiator.fileSystem().getConf());
+    jobConf.setInputFormat(inputFormat.getClass());
+    reader = getRecordReader(inputFormat, jobConf);
+  }
+
+  private RecordReader<BytesWritable, BytesWritable> getRecordReader(
+    final InputFormat<BytesWritable, BytesWritable> inputFormat, final JobConf jobConf)
+    throws ExecutionSetupException {
+    try {
+      final UserGroupInformation ugi = ImpersonationUtil.createProxyUgi(opUserName, queryUserName);
+      return ugi.doAs(new PrivilegedExceptionAction<RecordReader<BytesWritable, BytesWritable>>() {
+        @Override
+        public RecordReader<BytesWritable, BytesWritable> run() throws Exception {
+          return inputFormat.getRecordReader(split, jobConf, Reporter.NULL);
+        }
+      });
+    } catch (IOException | InterruptedException e) {
+      throw new ExecutionSetupException(
+        String.format("Error in creating sequencefile reader for file: %s, start: %d, length: %d",
+          split.getPath(), split.getStart(), split.getLength()), e);
+    }
+  }
+
+  @Override
+  public boolean open(FileSchemaNegotiator negotiator) {
+    negotiator.tableSchema(defineMetadata(), false);
+    logger.debug("The config is {}, root is {}, columns has {}", config, scan.getSelectionRoot(), scan.getColumns());
+    // open Sequencefile
+    try {
+      processReader(negotiator);
+    } catch (ExecutionSetupException e) {
+      throw UserException
+        .dataReadError(e)
+        .message("Unable to open Sequencefile %s", split.getPath())
+        .addContext(e.getMessage())
+        .addContext(errorContext)
+        .build(logger);
+    }
+    setLoader = negotiator.build();
+    loader = setLoader.writer();
+    keyWriter = loader.scalar(keySchema);
+    valueWriter = loader.scalar(valueSchema);
+    return true;
+  }
+
+  @Override
+  public boolean next() {
+    int recordCount = 0;
+    if (watch == null) {
+      watch = Stopwatch.createStarted();
+    }
+    try {
+      while (!loader.isFull()) {
+        if (reader.next(key, value)) {
+          loader.start();
+          keyWriter.setBytes(key.getBytes(), key.getLength());
+          valueWriter.setBytes(value.getBytes(), value.getLength());
+          loader.save();
+          ++ recordCount;
+        } else {
+          logger.debug("Read {} records in {} ms", recordCount, watch.elapsed(TimeUnit.MILLISECONDS));
+          return false;
+        }
+      }
+    } catch (IOException e) {
+      close();

Review comment:
       The `close()` call is not needed here.

##########
File path: exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileBatchReader.java
##########
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.sequencefile;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.dfs.easy.EasySubScan;
+import org.apache.drill.exec.util.ImpersonationUtil;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileAsBinaryInputFormat;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SequenceFileBatchReader implements ManagedReader<FileSchemaNegotiator> {
+
+  private static final Logger logger = LoggerFactory.getLogger(SequenceFileBatchReader.class);
+
+  private final SequenceFileFormatConfig config;
+  private final EasySubScan scan;
+  private FileSplit split;
+  private String queryUserName;
+  private String opUserName;
+  private final String keySchema = "binary_key";
+  private final String valueSchema = "binary_value";
+  private final BytesWritable key = new BytesWritable();
+  private final BytesWritable value = new BytesWritable();
+  private ResultSetLoader setLoader;
+  private RowSetLoader loader;
+  private ScalarWriter keyWriter;
+  private ScalarWriter valueWriter;
+  private RecordReader<BytesWritable, BytesWritable> reader;
+  private CustomErrorContext errorContext;
+  private Stopwatch watch;
+
+  public SequenceFileBatchReader(SequenceFileFormatConfig config, EasySubScan scan) {
+    this.config = config;
+    this.scan = scan;
+  }
+
+  private TupleMetadata defineMetadata() {
+    SchemaBuilder builder = new SchemaBuilder();
+    builder.addNullable(keySchema, MinorType.VARBINARY);
+    builder.addNullable(valueSchema, MinorType.VARBINARY);
+    return builder.buildSchema();
+  }
+
+  private void processReader(FileSchemaNegotiator negotiator) throws ExecutionSetupException {
+    final SequenceFileAsBinaryInputFormat inputFormat = new SequenceFileAsBinaryInputFormat();
+    split = negotiator.split();
+    final JobConf jobConf = new JobConf(negotiator.fileSystem().getConf());
+    jobConf.setInputFormat(inputFormat.getClass());
+    reader = getRecordReader(inputFormat, jobConf);
+  }
+
+  private RecordReader<BytesWritable, BytesWritable> getRecordReader(
+    final InputFormat<BytesWritable, BytesWritable> inputFormat, final JobConf jobConf)
+    throws ExecutionSetupException {
+    try {
+      final UserGroupInformation ugi = ImpersonationUtil.createProxyUgi(opUserName, queryUserName);
+      return ugi.doAs(new PrivilegedExceptionAction<RecordReader<BytesWritable, BytesWritable>>() {
+        @Override
+        public RecordReader<BytesWritable, BytesWritable> run() throws Exception {
+          return inputFormat.getRecordReader(split, jobConf, Reporter.NULL);
+        }
+      });
+    } catch (IOException | InterruptedException e) {
+      throw new ExecutionSetupException(
+        String.format("Error in creating sequencefile reader for file: %s, start: %d, length: %d",
+          split.getPath(), split.getStart(), split.getLength()), e);
+    }
+  }
+
+  @Override
+  public boolean open(FileSchemaNegotiator negotiator) {
+    negotiator.tableSchema(defineMetadata(), false);
+    logger.debug("The config is {}, root is {}, columns has {}", config, scan.getSelectionRoot(), scan.getColumns());
+    // open Sequencefile
+    try {
+      processReader(negotiator);
+    } catch (ExecutionSetupException e) {
+      throw UserException
+        .dataReadError(e)
+        .message("Unable to open Sequencefile %s", split.getPath())
+        .addContext(e.getMessage())
+        .addContext(errorContext)
+        .build(logger);
+    }
+    setLoader = negotiator.build();
+    loader = setLoader.writer();
+    keyWriter = loader.scalar(keySchema);
+    valueWriter = loader.scalar(valueSchema);
+    return true;
+  }
+
+  @Override
+  public boolean next() {
+    int recordCount = 0;
+    if (watch == null) {
+      watch = Stopwatch.createStarted();
+    }
+    try {
+      while (!loader.isFull()) {
+        if (reader.next(key, value)) {
+          loader.start();
+          keyWriter.setBytes(key.getBytes(), key.getLength());
+          valueWriter.setBytes(value.getBytes(), value.getLength());
+          loader.save();
+          ++ recordCount;
+        } else {
+          logger.debug("Read {} records in {} ms", recordCount, watch.elapsed(TimeUnit.MILLISECONDS));
+          return false;
+        }
+      }
+    } catch (IOException e) {
+      close();
+      throw UserException
+              .dataReadError(e)
+              .addContext("File Path", split.getPath())
+              .build(logger);
+    }
+    return true;
+  }
+
+  @Override
+  public void close() {
+    try {
+      if (reader != null) {

Review comment:
       Does the reader implement `AutoCloseable`?  If so you might want to use that instead.

##########
File path: exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileFormatPlugin.java
##########
@@ -17,92 +17,77 @@
  */
 package org.apache.drill.exec.store.easy.sequencefile;
 
-import java.io.IOException;
-import java.util.List;
-
 import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.logical.StoragePluginConfig;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.physical.base.AbstractGroupScan;
-import org.apache.drill.exec.planner.common.DrillStatsTable.TableStatistics;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileReaderFactory;
+import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileScanBuilder;
+import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
 import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
 import org.apache.drill.exec.server.DrillbitContext;
-import org.apache.drill.exec.store.RecordReader;
-import org.apache.drill.exec.store.RecordWriter;
-import org.apache.drill.exec.store.dfs.DrillFileSystem;
-import org.apache.drill.exec.store.dfs.FileSelection;
+import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
-import org.apache.drill.exec.store.dfs.easy.EasyGroupScan;
-import org.apache.drill.exec.store.dfs.easy.EasyWriter;
-import org.apache.drill.exec.store.dfs.easy.FileWork;
+import org.apache.drill.exec.store.dfs.easy.EasySubScan;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.FileSplit;
 
 public class SequenceFileFormatPlugin extends EasyFormatPlugin<SequenceFileFormatConfig> {
-  public SequenceFileFormatPlugin(String name, DrillbitContext context, Configuration fsConf,
-                                  StoragePluginConfig storageConfig) {
-    this(name, context, fsConf, storageConfig, new SequenceFileFormatConfig(null));
-  }
 
-  public SequenceFileFormatPlugin(String name, DrillbitContext context, Configuration fsConf,
-                                  StoragePluginConfig storageConfig, SequenceFileFormatConfig formatConfig) {
-    super(name, context, fsConf, storageConfig, formatConfig,
-      true, false, /* splittable = */ true, /* compressible = */ true,
-      formatConfig.getExtensions(), "sequencefile");
+  public SequenceFileFormatPlugin(String name,
+                                  DrillbitContext context,
+                                  Configuration fsConf,
+                                  StoragePluginConfig storageConfig,
+                                  SequenceFileFormatConfig formatConfig) {
+    super(name, easyConfig(fsConf, formatConfig), context, storageConfig, formatConfig);
   }
 
-  @Override
-  public boolean supportsPushDown() {
-    return true;
+  private static EasyFormatConfig easyConfig(Configuration fsConf, SequenceFileFormatConfig pluginConfig) {
+    EasyFormatConfig config = new EasyFormatConfig();
+    config.readable = true;
+    config.writable = false;
+    config.blockSplittable = true;
+    config.compressible = true;
+    config.extensions = pluginConfig.getExtensions();
+    config.fsConf = fsConf;
+    config.readerOperatorType = CoreOperatorType.SEQUENCE_SUB_SCAN_VALUE;
+    config.useEnhancedScan = true;
+    config.supportsProjectPushdown = true;
+    config.defaultName = SequenceFileFormatConfig.NAME;

Review comment:
       If you want to support the limit pushdown, you should add
   ```
   config.supportsLimitPushdown = true;
   ```
   to this method.

##########
File path: exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileBatchReader.java
##########
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.sequencefile;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.dfs.easy.EasySubScan;
+import org.apache.drill.exec.util.ImpersonationUtil;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileAsBinaryInputFormat;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SequenceFileBatchReader implements ManagedReader<FileSchemaNegotiator> {
+
+  private static final Logger logger = LoggerFactory.getLogger(SequenceFileBatchReader.class);
+
+  private final SequenceFileFormatConfig config;
+  private final EasySubScan scan;
+  private FileSplit split;
+  private String queryUserName;
+  private String opUserName;
+  private final String keySchema = "binary_key";
+  private final String valueSchema = "binary_value";
+  private final BytesWritable key = new BytesWritable();
+  private final BytesWritable value = new BytesWritable();
+  private ResultSetLoader setLoader;
+  private RowSetLoader loader;
+  private ScalarWriter keyWriter;
+  private ScalarWriter valueWriter;
+  private RecordReader<BytesWritable, BytesWritable> reader;
+  private CustomErrorContext errorContext;
+  private Stopwatch watch;
+
+  public SequenceFileBatchReader(SequenceFileFormatConfig config, EasySubScan scan) {
+    this.config = config;
+    this.scan = scan;
+  }
+
+  private TupleMetadata defineMetadata() {
+    SchemaBuilder builder = new SchemaBuilder();
+    builder.addNullable(keySchema, MinorType.VARBINARY);
+    builder.addNullable(valueSchema, MinorType.VARBINARY);
+    return builder.buildSchema();
+  }
+
+  private void processReader(FileSchemaNegotiator negotiator) throws ExecutionSetupException {
+    final SequenceFileAsBinaryInputFormat inputFormat = new SequenceFileAsBinaryInputFormat();
+    split = negotiator.split();
+    final JobConf jobConf = new JobConf(negotiator.fileSystem().getConf());
+    jobConf.setInputFormat(inputFormat.getClass());
+    reader = getRecordReader(inputFormat, jobConf);
+  }
+
+  private RecordReader<BytesWritable, BytesWritable> getRecordReader(
+    final InputFormat<BytesWritable, BytesWritable> inputFormat, final JobConf jobConf)
+    throws ExecutionSetupException {
+    try {
+      final UserGroupInformation ugi = ImpersonationUtil.createProxyUgi(opUserName, queryUserName);
+      return ugi.doAs(new PrivilegedExceptionAction<RecordReader<BytesWritable, BytesWritable>>() {
+        @Override
+        public RecordReader<BytesWritable, BytesWritable> run() throws Exception {
+          return inputFormat.getRecordReader(split, jobConf, Reporter.NULL);
+        }
+      });
+    } catch (IOException | InterruptedException e) {
+      throw new ExecutionSetupException(
+        String.format("Error in creating sequencefile reader for file: %s, start: %d, length: %d",
+          split.getPath(), split.getStart(), split.getLength()), e);
+    }
+  }
+
+  @Override
+  public boolean open(FileSchemaNegotiator negotiator) {
+    negotiator.tableSchema(defineMetadata(), false);
+    logger.debug("The config is {}, root is {}, columns has {}", config, scan.getSelectionRoot(), scan.getColumns());
+    // open Sequencefile
+    try {
+      processReader(negotiator);
+    } catch (ExecutionSetupException e) {
+      throw UserException
+        .dataReadError(e)
+        .message("Unable to open Sequencefile %s", split.getPath())
+        .addContext(e.getMessage())
+        .addContext(errorContext)
+        .build(logger);
+    }
+    setLoader = negotiator.build();
+    loader = setLoader.writer();
+    keyWriter = loader.scalar(keySchema);
+    valueWriter = loader.scalar(valueSchema);
+    return true;
+  }
+
+  @Override
+  public boolean next() {
+    int recordCount = 0;
+    if (watch == null) {
+      watch = Stopwatch.createStarted();
+    }
+    try {
+      while (!loader.isFull()) {
+        if (reader.next(key, value)) {
+          loader.start();
+          keyWriter.setBytes(key.getBytes(), key.getLength());
+          valueWriter.setBytes(value.getBytes(), value.getLength());
+          loader.save();
+          ++ recordCount;
+        } else {
+          logger.debug("Read {} records in {} ms", recordCount, watch.elapsed(TimeUnit.MILLISECONDS));
+          return false;
+        }
+      }
+    } catch (IOException e) {
+      close();
+      throw UserException
+              .dataReadError(e)
+              .addContext("File Path", split.getPath())
+              .build(logger);
+    }
+    return true;
+  }
+
+  @Override
+  public void close() {
+    try {
+      if (reader != null) {
+        reader.close();
+        reader = null;
+      }
+    } catch (IOException e) {
+      logger.warn("Exception closing reader: {}", e);
+    }
+  }
+
+  @Override
+  public String toString() {
+    long position = -1L;
+    try {
+      if (reader != null) {
+        position = reader.getPos();
+      }
+    } catch (IOException e) {
+      logger.trace("Unable to obtain reader position.", e);
+    }
+    return String.format("SequenceFileBatchReader[File=%s, Position=%s]", split.getPath(), position);
+  }
+

Review comment:
       NIT:  Remove extra line.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org