You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by GitBox <gi...@apache.org> on 2021/11/25 23:25:10 UTC

[GitHub] [drill] paul-rogers commented on a change in pull request #2359: DRILL-8028: Add PDF Format Plugin

paul-rogers commented on a change in pull request #2359:
URL: https://github.com/apache/drill/pull/2359#discussion_r757144135



##########
File path: contrib/format-pdf/README.md
##########
@@ -0,0 +1,67 @@
+# Format Plugin for PDF Table Reader
+One of the most annoying tasks is when you are working on a data science project and you get data that is in a PDF file. This plugin endeavours to enable you to query data in
+ PDF tables using Drill's SQL interface.  

Review comment:
       Nit: extra leading space.

##########
File path: contrib/format-pdf/src/main/java/org/apache/drill/exec/store/pdf/PdfBatchReader.java
##########
@@ -0,0 +1,549 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.pdf;
+
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.shaded.guava.com.google.common.base.Strings;
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.record.metadata.MetadataUtils;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.exec.vector.accessor.TupleWriter;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.pdfbox.pdmodel.PDDocument;
+import org.apache.pdfbox.pdmodel.PDDocumentInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import technology.tabula.RectangularTextContainer;
+import technology.tabula.Table;
+
+import java.io.InputStream;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.List;
+
+public class PdfBatchReader implements ManagedReader<FileScanFramework.FileSchemaNegotiator> {
+
+  private static final Logger logger = LoggerFactory.getLogger(PdfBatchReader.class);
+  private static final String NEW_FIELD_PREFIX = "field_";
+  private final int maxRecords;
+
+  private final List<PdfColumnWriter> writers;
+  private FileSplit split;
+  private CustomErrorContext errorContext;
+  private RowSetLoader rowWriter;
+  private InputStream fsStream;
+  private PDDocument document;
+  private PdfReaderConfig config;
+
+  private SchemaBuilder builder;
+  private List<String> columnHeaders;
+  private int currentRowIndex;
+  private Table currentTable;
+  private int currentTableIndex;
+  private int startingTableIndex;
+  private FileScanFramework.FileSchemaNegotiator negotiator;
+
+  // Document Metadata Fields
+  private int pageCount;
+  private String title;
+  private String author;
+  private String subject;
+  private String keywords;
+  private String creator;
+  private String producer;
+  private Calendar creationDate;
+  private Calendar modificationDate;
+  private String trapped;
+  private int unregisteredColumnCount;
+  private int columns;
+  private int tableCount;
+  private int rows;
+  private int metadataIndex;
+
+
+  // Tables
+  private List<Table> tables;
+
+
+  static class PdfReaderConfig {
+    final PdfFormatPlugin plugin;
+
+    PdfReaderConfig(PdfFormatPlugin plugin) {
+      this.plugin = plugin;
+    }
+  }
+
+  public PdfBatchReader(PdfReaderConfig readerConfig, int maxRecords) {
+    this.maxRecords = maxRecords;
+    this.unregisteredColumnCount = 0;
+    this.writers = new ArrayList<>();
+    this.config = readerConfig;
+    this.startingTableIndex = readerConfig.plugin.getConfig().getDefaultTableIndex() < 0 ? 0 : readerConfig.plugin.getConfig().getDefaultTableIndex();
+    this.currentTableIndex = this.startingTableIndex;
+    this.columnHeaders = new ArrayList<>();
+  }
+
+  @Override
+  public boolean open(FileScanFramework.FileSchemaNegotiator negotiator) {
+    System.setProperty("java.awt.headless", "true");
+    this.negotiator = negotiator;
+
+    split = negotiator.split();
+    errorContext = negotiator.parentErrorContext();
+    builder = new SchemaBuilder();
+
+    openFile();
+
+    // Get the tables
+    tables = Utils.extractTablesFromPDF(document);
+    populateMetadata();
+    logger.debug("Found {} tables", tables.size());
+
+    // Support provided schema
+    TupleMetadata schema = null;
+    if (this.negotiator.hasProvidedSchema()) {
+      schema = this.negotiator.providedSchema();
+      this.negotiator.tableSchema(schema, false);
+    } else {
+      this.negotiator.tableSchema(buildSchema(), false);
+    }
+    ResultSetLoader loader = this.negotiator.build();
+    rowWriter = loader.writer();
+
+    if (negotiator.hasProvidedSchema()) {
+      buildWriterListFromProvidedSchema(schema);
+    } else {
+      buildWriterList();
+    }
+    addImplicitColumnsToSchema();
+
+    // Prepare for reading
+    currentRowIndex = 1;  // Skip the first line if there are headers
+    currentTable = tables.get(startingTableIndex);
+
+    return true;
+  }
+
+  @Override
+  public boolean next() {
+    System.setProperty("java.awt.headless", "true");
+
+    while(!rowWriter.isFull()) {
+      // Check to see if the limit has been reached
+      if (rowWriter.limitReached(maxRecords)) {
+        return false;
+      } else if (currentRowIndex >= currentTable.getRows().size() &&
+                  currentTableIndex < tables.size() &&
+                  config.plugin.getConfig().getCombinePages()) {
+        currentRowIndex = 0;
+        currentTable = tables.get(currentTableIndex++);
+      } else if (currentRowIndex >= currentTable.getRows().size()) {
+        return false;
+      }
+
+      // Process the row
+      processRow(currentTable.getRows().get(currentRowIndex));
+      currentRowIndex++;
+    }
+    return true;
+  }
+
+  private void processRow(List<RectangularTextContainer> row) {
+    if (row == null || row.size() == 0) {
+      return;
+    }
+
+    String value;
+    rowWriter.start();
+    for (int i = 0; i < row.size(); i++) {
+      value = row.get(i).getText();
+
+      if (Strings.isNullOrEmpty(value)) {
+        continue;
+      }
+      writers.get(i).load(row.get(i));
+    }
+    writeMetadata();
+    rowWriter.save();
+  }
+
+  @Override
+  public void close() {
+    if (fsStream != null) {
+      AutoCloseables.closeSilently(fsStream);
+      fsStream = null;
+    }
+
+    if (document != null) {
+      AutoCloseables.closeSilently(document.getDocument());
+      AutoCloseables.closeSilently(document);
+      document = null;
+    }
+  }
+
+  /**
+   * This method opens the PDF file, and finds the tables
+   */
+  private void openFile() {
+    try {
+      fsStream = negotiator.fileSystem().openPossiblyCompressedStream(split.getPath());
+      document = PDDocument.load(fsStream);
+    } catch (Exception e) {
+      throw UserException
+        .dataReadError(e)
+        .message("Failed to open open input file: %s", split.getPath().toString())
+        .addContext(e.getMessage())
+        .addContext(errorContext)
+        .build(logger);

Review comment:
       Better:
   
   ```
         throw UserException
           .dataReadError(e)
           .addContext("Failed to open open input file: %s", split.getPath().toString())
           .addContext(errorContext)
           .build(logger);
   ```

##########
File path: contrib/format-pdf/src/main/java/org/apache/drill/exec/store/pdf/PdfBatchReader.java
##########
@@ -0,0 +1,549 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.pdf;
+
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.shaded.guava.com.google.common.base.Strings;
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.record.metadata.MetadataUtils;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.exec.vector.accessor.TupleWriter;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.pdfbox.pdmodel.PDDocument;
+import org.apache.pdfbox.pdmodel.PDDocumentInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import technology.tabula.RectangularTextContainer;
+import technology.tabula.Table;
+
+import java.io.InputStream;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.List;
+
+public class PdfBatchReader implements ManagedReader<FileScanFramework.FileSchemaNegotiator> {
+
+  private static final Logger logger = LoggerFactory.getLogger(PdfBatchReader.class);
+  private static final String NEW_FIELD_PREFIX = "field_";
+  private final int maxRecords;
+
+  private final List<PdfColumnWriter> writers;
+  private FileSplit split;
+  private CustomErrorContext errorContext;
+  private RowSetLoader rowWriter;
+  private InputStream fsStream;
+  private PDDocument document;
+  private PdfReaderConfig config;
+
+  private SchemaBuilder builder;
+  private List<String> columnHeaders;
+  private int currentRowIndex;
+  private Table currentTable;
+  private int currentTableIndex;
+  private int startingTableIndex;
+  private FileScanFramework.FileSchemaNegotiator negotiator;
+
+  // Document Metadata Fields
+  private int pageCount;
+  private String title;
+  private String author;
+  private String subject;
+  private String keywords;
+  private String creator;
+  private String producer;
+  private Calendar creationDate;
+  private Calendar modificationDate;
+  private String trapped;
+  private int unregisteredColumnCount;
+  private int columns;
+  private int tableCount;
+  private int rows;
+  private int metadataIndex;
+
+
+  // Tables
+  private List<Table> tables;
+
+
+  static class PdfReaderConfig {
+    final PdfFormatPlugin plugin;
+
+    PdfReaderConfig(PdfFormatPlugin plugin) {
+      this.plugin = plugin;
+    }
+  }
+
+  public PdfBatchReader(PdfReaderConfig readerConfig, int maxRecords) {
+    this.maxRecords = maxRecords;
+    this.unregisteredColumnCount = 0;
+    this.writers = new ArrayList<>();
+    this.config = readerConfig;
+    this.startingTableIndex = readerConfig.plugin.getConfig().getDefaultTableIndex() < 0 ? 0 : readerConfig.plugin.getConfig().getDefaultTableIndex();
+    this.currentTableIndex = this.startingTableIndex;
+    this.columnHeaders = new ArrayList<>();
+  }
+
+  @Override
+  public boolean open(FileScanFramework.FileSchemaNegotiator negotiator) {
+    System.setProperty("java.awt.headless", "true");
+    this.negotiator = negotiator;
+
+    split = negotiator.split();
+    errorContext = negotiator.parentErrorContext();
+    builder = new SchemaBuilder();
+
+    openFile();
+
+    // Get the tables
+    tables = Utils.extractTablesFromPDF(document);
+    populateMetadata();
+    logger.debug("Found {} tables", tables.size());
+
+    // Support provided schema
+    TupleMetadata schema = null;
+    if (this.negotiator.hasProvidedSchema()) {
+      schema = this.negotiator.providedSchema();
+      this.negotiator.tableSchema(schema, false);
+    } else {
+      this.negotiator.tableSchema(buildSchema(), false);
+    }
+    ResultSetLoader loader = this.negotiator.build();
+    rowWriter = loader.writer();
+
+    if (negotiator.hasProvidedSchema()) {
+      buildWriterListFromProvidedSchema(schema);
+    } else {
+      buildWriterList();
+    }
+    addImplicitColumnsToSchema();
+
+    // Prepare for reading
+    currentRowIndex = 1;  // Skip the first line if there are headers
+    currentTable = tables.get(startingTableIndex);
+
+    return true;
+  }
+
+  @Override
+  public boolean next() {
+    System.setProperty("java.awt.headless", "true");
+
+    while(!rowWriter.isFull()) {
+      // Check to see if the limit has been reached
+      if (rowWriter.limitReached(maxRecords)) {
+        return false;
+      } else if (currentRowIndex >= currentTable.getRows().size() &&
+                  currentTableIndex < tables.size() &&
+                  config.plugin.getConfig().getCombinePages()) {
+        currentRowIndex = 0;
+        currentTable = tables.get(currentTableIndex++);
+      } else if (currentRowIndex >= currentTable.getRows().size()) {
+        return false;
+      }
+
+      // Process the row
+      processRow(currentTable.getRows().get(currentRowIndex));
+      currentRowIndex++;
+    }
+    return true;
+  }
+
+  private void processRow(List<RectangularTextContainer> row) {
+    if (row == null || row.size() == 0) {
+      return;
+    }
+
+    String value;
+    rowWriter.start();
+    for (int i = 0; i < row.size(); i++) {
+      value = row.get(i).getText();
+
+      if (Strings.isNullOrEmpty(value)) {
+        continue;
+      }
+      writers.get(i).load(row.get(i));
+    }
+    writeMetadata();
+    rowWriter.save();
+  }
+
+  @Override
+  public void close() {
+    if (fsStream != null) {
+      AutoCloseables.closeSilently(fsStream);
+      fsStream = null;
+    }
+
+    if (document != null) {
+      AutoCloseables.closeSilently(document.getDocument());
+      AutoCloseables.closeSilently(document);
+      document = null;
+    }
+  }
+
+  /**
+   * This method opens the PDF file, and finds the tables
+   */
+  private void openFile() {
+    try {
+      fsStream = negotiator.fileSystem().openPossiblyCompressedStream(split.getPath());
+      document = PDDocument.load(fsStream);
+    } catch (Exception e) {
+      throw UserException
+        .dataReadError(e)
+        .message("Failed to open open input file: %s", split.getPath().toString())
+        .addContext(e.getMessage())
+        .addContext(errorContext)
+        .build(logger);
+    }
+  }
+
+  /**
+   * Metadata fields are calculated once when the file is opened.  This function populates
+   * the metadata fields so that these are only calculated once.
+   */
+  private void populateMetadata() {
+    PDDocumentInformation info = document.getDocumentInformation();
+    pageCount = document.getNumberOfPages();
+    title = info.getTitle();
+    author = info.getAuthor();
+    subject = info.getSubject();
+    keywords = info.getKeywords();
+    creator = info.getCreator();
+    producer = info.getProducer();
+    creationDate = info.getCreationDate();
+    modificationDate = info.getModificationDate();
+    trapped = info.getTrapped();
+    tableCount = tables.size();
+  }
+
+  private void addImplicitColumnsToSchema() {
+    metadataIndex = columns;
+    // Add to schema
+    addMetadataColumnToSchema("_page_count", MinorType.INT);
+    addMetadataColumnToSchema("_title", MinorType.VARCHAR);
+    addMetadataColumnToSchema("_author", MinorType.VARCHAR);
+    addMetadataColumnToSchema("_subject", MinorType.VARCHAR);
+    addMetadataColumnToSchema("_keywords", MinorType.VARCHAR);
+    addMetadataColumnToSchema("_creator", MinorType.VARCHAR);
+    addMetadataColumnToSchema("_producer", MinorType.VARCHAR);
+    addMetadataColumnToSchema("_creation_date", MinorType.TIMESTAMP);
+    addMetadataColumnToSchema("_modification_date", MinorType.TIMESTAMP);
+    addMetadataColumnToSchema("_trapped", MinorType.VARCHAR);
+    addMetadataColumnToSchema("_table_count", MinorType.INT);
+  }
+
+  private void addMetadataColumnToSchema(String columnName, MinorType dataType) {
+    int index = rowWriter.tupleSchema().index(columnName);
+    if (index == -1) {
+      ColumnMetadata colSchema = MetadataUtils.newScalar(columnName, dataType, DataMode.OPTIONAL);
+
+      // Exclude from wildcard queries
+      colSchema.setBooleanProperty(ColumnMetadata.EXCLUDE_FROM_WILDCARD, true);
+      metadataIndex++;
+
+      index = rowWriter.addColumn(colSchema);
+    }
+
+    writers.add(new StringPdfColumnWriter(index, columnName, rowWriter));
+  }
+
+  private void writeMetadata() {
+    int startingIndex = columnHeaders.size();
+    writers.get(startingIndex).getWriter().setInt(pageCount);
+    writeStringMetadataField(title, startingIndex+1);
+    writeStringMetadataField(author, startingIndex+2);
+    writeStringMetadataField(subject, startingIndex+3);
+    writeStringMetadataField(keywords, startingIndex+4);
+    writeStringMetadataField(creator, startingIndex+5);
+    writeStringMetadataField(producer, startingIndex+6);
+    writeTimestampMetadataField(creationDate, startingIndex+7);
+    writeTimestampMetadataField(modificationDate, startingIndex+8);
+    writeStringMetadataField(trapped, startingIndex+9);
+    writers.get(startingIndex+10).getWriter().setInt(tableCount);
+  }
+
+  private void writeStringMetadataField(String value, int index) {
+    if (value == null) {
+      return;
+    }
+    writers.get(index).getWriter().setString(value);
+  }
+
+  private void writeTimestampMetadataField(Calendar dateValue, int index) {
+    if (dateValue == null) {
+      return;
+    }
+
+    writers.get(index).getWriter().setTimestamp(Instant.ofEpochMilli(dateValue.getTimeInMillis()));
+  }
+
+  private void addUnknownColumnToSchemaAndCreateWriter (TupleWriter rowWriter, String name) {
+    int index = rowWriter.tupleSchema().index(name);
+    if (index == -1) {
+      ColumnMetadata colSchema = MetadataUtils.newScalar(name, TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL);
+      index = rowWriter.addColumn(colSchema);
+    }
+    ScalarWriter colWriter = rowWriter.scalar(index);
+
+    // Create a new column name which will be field_n
+    String newColumnName = NEW_FIELD_PREFIX + unregisteredColumnCount;
+    unregisteredColumnCount++;
+
+    // Add a new writer.  Since we want the metadata always to be at the end of the schema, we must track the metadata
+    // index and add this before the metadata, so that the column index tracks with the writer index.
+    writers.add(metadataIndex, new StringPdfColumnWriter(1, newColumnName, (RowSetLoader) rowWriter));
+
+    // Increment the metadata index
+    metadataIndex++;
+  }
+
+  private TupleMetadata buildSchema() {
+    Table table = tables.get(startingTableIndex);
+    columns = table.getColCount();
+    rows = table.getRowCount();
+
+    // Get column header names
+    columnHeaders = Utils.extractRowValues(table);
+
+    // Add columns to table
+    int index = 0;
+    for (String columnName : columnHeaders) {
+      if (Strings.isNullOrEmpty(columnName)) {
+        columnName = NEW_FIELD_PREFIX + unregisteredColumnCount;
+        columnHeaders.set(index, columnName);
+        unregisteredColumnCount++;
+      }
+      builder.addNullable(columnName, MinorType.VARCHAR);
+      index++;
+    }
+
+    return builder.buildSchema();
+  }
+
+  private void buildWriterList() {
+    for (String header : columnHeaders) {
+      writers.add(new StringPdfColumnWriter(columnHeaders.indexOf(header), header, rowWriter));
+    }
+  }
+
+  private void buildWriterListFromProvidedSchema(TupleMetadata schema) {
+    if (schema == null) {
+      buildWriterList();
+      return;
+    }
+    int counter = 0;
+    for (MaterializedField field: schema.toFieldList()) {
+      String fieldName = field.getName();
+      MinorType type = field.getType().getMinorType();
+      columnHeaders.add(fieldName);
+
+      switch (type) {
+        case VARCHAR:
+          writers.add(new StringPdfColumnWriter(counter, fieldName, rowWriter));
+          break;
+        case SMALLINT:
+        case TINYINT:
+        case INT:
+          writers.add(new IntPdfColumnWriter(counter, fieldName, rowWriter));
+          break;
+        case BIGINT:
+          writers.add(new BigIntPdfColumnWriter(counter, fieldName, rowWriter));
+          break;
+        case FLOAT4:
+        case FLOAT8:
+          writers.add(new DoublePdfColumnWriter(counter, fieldName, rowWriter));
+          break;
+        case DATE:
+          writers.add(new DatePdfColumnWriter(counter, fieldName, rowWriter, negotiator));
+          break;
+        case TIME:
+          writers.add(new TimePdfColumnWriter(counter, fieldName, rowWriter, negotiator));
+          break;
+        case TIMESTAMP:
+          writers.add(new TimestampPdfColumnWriter(counter, fieldName, rowWriter, negotiator));
+          break;
+        default:
+          throw UserException.unsupportedError()
+            .message("PDF Reader with provided schema does not support " + type.name() + " data type.")
+            .addContext(errorContext)
+            .build(logger);
+      }
+    }
+  }
+
+  public abstract static class PdfColumnWriter {
+    final String columnName;
+    final ScalarWriter writer;
+    final int columnIndex;
+
+    public PdfColumnWriter(int columnIndex, String columnName, ScalarWriter writer) {
+      this.columnIndex = columnIndex;
+      this.columnName = columnName;
+      this.writer = writer;
+    }
+
+    public abstract void load (RectangularTextContainer<?> cell);
+
+    public ScalarWriter getWriter() {
+      return writer;
+    }
+  }
+
+  public static class IntPdfColumnWriter extends PdfColumnWriter {
+    IntPdfColumnWriter (int columnIndex, String columnName, RowSetLoader rowWriter) {
+      super(columnIndex, columnName, rowWriter.scalar(columnName));
+    }
+
+    @Override
+    public void load(RectangularTextContainer<?> cell) {
+      writer.setInt(Integer.parseInt(cell.getText()));
+    }
+  }
+
+  public static class BigIntPdfColumnWriter extends PdfColumnWriter {
+    BigIntPdfColumnWriter (int columnIndex, String columnName, RowSetLoader rowWriter) {
+      super(columnIndex, columnName, rowWriter.scalar(columnName));
+    }
+
+    @Override
+    public void load(RectangularTextContainer<?> cell) {
+      writer.setLong(Long.parseLong(cell.getText()));

Review comment:
       First, bravo for using this trick to optimize the innermost per-column load path.
   
   What happens if the parse fails? Need to catch the number format exception and convert it to a `UserException`. Or, you could let the EVF do it for you with its column converters which do exactly this work.
   
   Aside: if this were a quick & dirty add-on, I'd not worry about error handling or performance. But, because it is part of core Drill, and Drill must scale, we have to worry about all these details, even if maybe no one will ever read a zillion huge PDFs. Because, if I don't, how do I then turn around and ask the next reader to provide a scalable solution?

##########
File path: contrib/format-pdf/src/main/java/org/apache/drill/exec/store/pdf/PdfBatchReader.java
##########
@@ -0,0 +1,549 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.pdf;
+
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.shaded.guava.com.google.common.base.Strings;
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.record.metadata.MetadataUtils;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.exec.vector.accessor.TupleWriter;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.pdfbox.pdmodel.PDDocument;
+import org.apache.pdfbox.pdmodel.PDDocumentInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import technology.tabula.RectangularTextContainer;
+import technology.tabula.Table;
+
+import java.io.InputStream;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.List;
+
+public class PdfBatchReader implements ManagedReader<FileScanFramework.FileSchemaNegotiator> {
+
+  private static final Logger logger = LoggerFactory.getLogger(PdfBatchReader.class);
+  private static final String NEW_FIELD_PREFIX = "field_";
+  private final int maxRecords;
+
+  private final List<PdfColumnWriter> writers;
+  private FileSplit split;
+  private CustomErrorContext errorContext;
+  private RowSetLoader rowWriter;
+  private InputStream fsStream;
+  private PDDocument document;
+  private PdfReaderConfig config;
+
+  private SchemaBuilder builder;
+  private List<String> columnHeaders;
+  private int currentRowIndex;
+  private Table currentTable;
+  private int currentTableIndex;
+  private int startingTableIndex;
+  private FileScanFramework.FileSchemaNegotiator negotiator;
+
+  // Document Metadata Fields
+  private int pageCount;
+  private String title;
+  private String author;
+  private String subject;
+  private String keywords;
+  private String creator;
+  private String producer;
+  private Calendar creationDate;
+  private Calendar modificationDate;
+  private String trapped;
+  private int unregisteredColumnCount;
+  private int columns;
+  private int tableCount;
+  private int rows;
+  private int metadataIndex;
+
+
+  // Tables
+  private List<Table> tables;
+
+
+  static class PdfReaderConfig {
+    final PdfFormatPlugin plugin;
+
+    PdfReaderConfig(PdfFormatPlugin plugin) {
+      this.plugin = plugin;
+    }
+  }
+
+  public PdfBatchReader(PdfReaderConfig readerConfig, int maxRecords) {
+    this.maxRecords = maxRecords;
+    this.unregisteredColumnCount = 0;
+    this.writers = new ArrayList<>();
+    this.config = readerConfig;
+    this.startingTableIndex = readerConfig.plugin.getConfig().getDefaultTableIndex() < 0 ? 0 : readerConfig.plugin.getConfig().getDefaultTableIndex();
+    this.currentTableIndex = this.startingTableIndex;
+    this.columnHeaders = new ArrayList<>();
+  }
+
+  @Override
+  public boolean open(FileScanFramework.FileSchemaNegotiator negotiator) {
+    System.setProperty("java.awt.headless", "true");

Review comment:
       Oh, my. Drill can run embedded. Suppose I'm running Drill embedded in my AWT application. Will this break things?
   
   Drill is multi-threaded: we an have a dozen readers all setting this same system property concurrently with other code reading it. Is this a safe thing to do?
   
   More fundamentally, should we be mucking about with shared system properties in a reader? Probably not.

##########
File path: contrib/format-pdf/src/main/java/org/apache/drill/exec/store/pdf/PdfBatchReader.java
##########
@@ -0,0 +1,549 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.pdf;
+
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.shaded.guava.com.google.common.base.Strings;
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.record.metadata.MetadataUtils;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.exec.vector.accessor.TupleWriter;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.pdfbox.pdmodel.PDDocument;
+import org.apache.pdfbox.pdmodel.PDDocumentInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import technology.tabula.RectangularTextContainer;
+import technology.tabula.Table;
+
+import java.io.InputStream;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.List;
+
+public class PdfBatchReader implements ManagedReader<FileScanFramework.FileSchemaNegotiator> {
+
+  private static final Logger logger = LoggerFactory.getLogger(PdfBatchReader.class);
+  private static final String NEW_FIELD_PREFIX = "field_";
+  private final int maxRecords;
+
+  private final List<PdfColumnWriter> writers;
+  private FileSplit split;
+  private CustomErrorContext errorContext;
+  private RowSetLoader rowWriter;
+  private InputStream fsStream;
+  private PDDocument document;
+  private PdfReaderConfig config;
+
+  private SchemaBuilder builder;
+  private List<String> columnHeaders;
+  private int currentRowIndex;
+  private Table currentTable;
+  private int currentTableIndex;
+  private int startingTableIndex;
+  private FileScanFramework.FileSchemaNegotiator negotiator;
+
+  // Document Metadata Fields
+  private int pageCount;
+  private String title;
+  private String author;
+  private String subject;
+  private String keywords;
+  private String creator;
+  private String producer;
+  private Calendar creationDate;
+  private Calendar modificationDate;
+  private String trapped;
+  private int unregisteredColumnCount;
+  private int columns;
+  private int tableCount;
+  private int rows;
+  private int metadataIndex;
+
+
+  // Tables
+  private List<Table> tables;
+
+
+  static class PdfReaderConfig {
+    final PdfFormatPlugin plugin;
+
+    PdfReaderConfig(PdfFormatPlugin plugin) {
+      this.plugin = plugin;
+    }
+  }
+
+  public PdfBatchReader(PdfReaderConfig readerConfig, int maxRecords) {
+    this.maxRecords = maxRecords;
+    this.unregisteredColumnCount = 0;
+    this.writers = new ArrayList<>();
+    this.config = readerConfig;
+    this.startingTableIndex = readerConfig.plugin.getConfig().getDefaultTableIndex() < 0 ? 0 : readerConfig.plugin.getConfig().getDefaultTableIndex();
+    this.currentTableIndex = this.startingTableIndex;
+    this.columnHeaders = new ArrayList<>();
+  }
+
+  @Override
+  public boolean open(FileScanFramework.FileSchemaNegotiator negotiator) {
+    System.setProperty("java.awt.headless", "true");
+    this.negotiator = negotiator;
+
+    split = negotiator.split();
+    errorContext = negotiator.parentErrorContext();
+    builder = new SchemaBuilder();
+
+    openFile();
+
+    // Get the tables
+    tables = Utils.extractTablesFromPDF(document);
+    populateMetadata();
+    logger.debug("Found {} tables", tables.size());

Review comment:
       Think concurrency. Found {} tables in which file?

##########
File path: contrib/format-pdf/README.md
##########
@@ -0,0 +1,67 @@
+# Format Plugin for PDF Table Reader
+One of the most annoying tasks is when you are working on a data science project and you get data that is in a PDF file. This plugin endeavours to enable you to query data in
+ PDF tables using Drill's SQL interface.  
+
+## Data Model
+Since PDF files generally are not intended to be queried or read by machines, mapping the data to tables and rows is not a perfect process.  The PDF reader does support 
+provided schema. 
+
+### Merging Pages
+The PDF reader reads tables from PDF files on each page.  If your PDF file has tables that span multiple pages, you can set the `combinePages` parameter to `true` and Drill 
+will merge all the tables in the PDF file.  You can also do this at query time with the `table()` function.
+
+## Configuration
+To configure the PDF reader, simply add the information below to the `formats` section of a file base storage plugin. 

Review comment:
       What is a "file base storage plugin"? Maybe add "such as dfs".

##########
File path: contrib/format-pdf/README.md
##########
@@ -0,0 +1,67 @@
+# Format Plugin for PDF Table Reader
+One of the most annoying tasks is when you are working on a data science project and you get data that is in a PDF file. This plugin endeavours to enable you to query data in
+ PDF tables using Drill's SQL interface.  
+
+## Data Model
+Since PDF files generally are not intended to be queried or read by machines, mapping the data to tables and rows is not a perfect process.  The PDF reader does support 
+provided schema. 

Review comment:
       Probably want to link to the reference for provided schema. I suspect it's not a well-known feature. Also, explain what the provided schema provides (fields within the table?)

##########
File path: contrib/format-pdf/README.md
##########
@@ -0,0 +1,67 @@
+# Format Plugin for PDF Table Reader
+One of the most annoying tasks is when you are working on a data science project and you get data that is in a PDF file. This plugin endeavours to enable you to query data in
+ PDF tables using Drill's SQL interface.  
+
+## Data Model
+Since PDF files generally are not intended to be queried or read by machines, mapping the data to tables and rows is not a perfect process.  The PDF reader does support 
+provided schema. 
+
+### Merging Pages
+The PDF reader reads tables from PDF files on each page.  If your PDF file has tables that span multiple pages, you can set the `combinePages` parameter to `true` and Drill 
+will merge all the tables in the PDF file.  You can also do this at query time with the `table()` function.
+
+## Configuration
+To configure the PDF reader, simply add the information below to the `formats` section of a file base storage plugin. 
+
+```json
+"pdf": {
+  "type": "pdf",
+  "extensions": [
+    "pdf"
+  ],
+  "extractHeaders": true,
+  "combinePages": false
+}
+```
+The avaialable options are:
+* `extractHeaders`: Extracts the first row of any tables as the header row.  If set to false, Drill will assign column names of `field_0`, `field_1` to each column.
+* `combinePages`: Merges multipage tables together.
+* `defaultTableIndex`:  Allows you to query different tables within the PDF file. Index starts at `0`. 
+
+
+## Accessing Document Metadata Fields
+PDF files have a considerable amount of metadata which can be useful for analysis.  Drill will extract the following fields from every PDF file.  Note that these fields are not
+ projected in star queries and must be selected explicitly.  The document's creator populates these fields and some or all may be empty. With the exception of `_page_count

Review comment:
       Nit: leading space.

##########
File path: contrib/format-pdf/src/main/java/org/apache/drill/exec/store/pdf/PdfBatchReader.java
##########
@@ -0,0 +1,549 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.pdf;
+
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.shaded.guava.com.google.common.base.Strings;
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.record.metadata.MetadataUtils;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.exec.vector.accessor.TupleWriter;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.pdfbox.pdmodel.PDDocument;
+import org.apache.pdfbox.pdmodel.PDDocumentInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import technology.tabula.RectangularTextContainer;
+import technology.tabula.Table;
+
+import java.io.InputStream;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.List;
+
+public class PdfBatchReader implements ManagedReader<FileScanFramework.FileSchemaNegotiator> {
+
+  private static final Logger logger = LoggerFactory.getLogger(PdfBatchReader.class);
+  private static final String NEW_FIELD_PREFIX = "field_";
+  private final int maxRecords;
+
+  private final List<PdfColumnWriter> writers;
+  private FileSplit split;
+  private CustomErrorContext errorContext;
+  private RowSetLoader rowWriter;
+  private InputStream fsStream;
+  private PDDocument document;
+  private PdfReaderConfig config;
+
+  private SchemaBuilder builder;
+  private List<String> columnHeaders;
+  private int currentRowIndex;
+  private Table currentTable;
+  private int currentTableIndex;
+  private int startingTableIndex;
+  private FileScanFramework.FileSchemaNegotiator negotiator;
+
+  // Document Metadata Fields
+  private int pageCount;
+  private String title;
+  private String author;
+  private String subject;
+  private String keywords;
+  private String creator;
+  private String producer;
+  private Calendar creationDate;
+  private Calendar modificationDate;
+  private String trapped;
+  private int unregisteredColumnCount;
+  private int columns;
+  private int tableCount;
+  private int rows;
+  private int metadataIndex;
+
+
+  // Tables
+  private List<Table> tables;
+
+
+  static class PdfReaderConfig {
+    final PdfFormatPlugin plugin;
+
+    PdfReaderConfig(PdfFormatPlugin plugin) {
+      this.plugin = plugin;
+    }
+  }
+
+  public PdfBatchReader(PdfReaderConfig readerConfig, int maxRecords) {
+    this.maxRecords = maxRecords;
+    this.unregisteredColumnCount = 0;
+    this.writers = new ArrayList<>();
+    this.config = readerConfig;
+    this.startingTableIndex = readerConfig.plugin.getConfig().getDefaultTableIndex() < 0 ? 0 : readerConfig.plugin.getConfig().getDefaultTableIndex();
+    this.currentTableIndex = this.startingTableIndex;
+    this.columnHeaders = new ArrayList<>();
+  }
+
+  @Override
+  public boolean open(FileScanFramework.FileSchemaNegotiator negotiator) {
+    System.setProperty("java.awt.headless", "true");
+    this.negotiator = negotiator;
+
+    split = negotiator.split();
+    errorContext = negotiator.parentErrorContext();
+    builder = new SchemaBuilder();
+
+    openFile();
+
+    // Get the tables
+    tables = Utils.extractTablesFromPDF(document);
+    populateMetadata();
+    logger.debug("Found {} tables", tables.size());
+
+    // Support provided schema
+    TupleMetadata schema = null;
+    if (this.negotiator.hasProvidedSchema()) {
+      schema = this.negotiator.providedSchema();
+      this.negotiator.tableSchema(schema, false);
+    } else {
+      this.negotiator.tableSchema(buildSchema(), false);
+    }
+    ResultSetLoader loader = this.negotiator.build();
+    rowWriter = loader.writer();
+
+    if (negotiator.hasProvidedSchema()) {
+      buildWriterListFromProvidedSchema(schema);
+    } else {
+      buildWriterList();
+    }
+    addImplicitColumnsToSchema();
+
+    // Prepare for reading
+    currentRowIndex = 1;  // Skip the first line if there are headers
+    currentTable = tables.get(startingTableIndex);
+
+    return true;
+  }
+
+  @Override
+  public boolean next() {
+    System.setProperty("java.awt.headless", "true");
+
+    while(!rowWriter.isFull()) {
+      // Check to see if the limit has been reached
+      if (rowWriter.limitReached(maxRecords)) {
+        return false;
+      } else if (currentRowIndex >= currentTable.getRows().size() &&
+                  currentTableIndex < tables.size() &&
+                  config.plugin.getConfig().getCombinePages()) {
+        currentRowIndex = 0;
+        currentTable = tables.get(currentTableIndex++);
+      } else if (currentRowIndex >= currentTable.getRows().size()) {
+        return false;
+      }
+
+      // Process the row
+      processRow(currentTable.getRows().get(currentRowIndex));
+      currentRowIndex++;
+    }
+    return true;
+  }
+
+  private void processRow(List<RectangularTextContainer> row) {
+    if (row == null || row.size() == 0) {
+      return;
+    }
+
+    String value;
+    rowWriter.start();
+    for (int i = 0; i < row.size(); i++) {
+      value = row.get(i).getText();

Review comment:
       Nit:
   
   ```java
       Something row = row.get(i);
       if (!Strings.isNullOrEmpty(row.getText()) {
         writers.get(i).load(row);
       }
   ```

##########
File path: contrib/format-pdf/README.md
##########
@@ -0,0 +1,67 @@
+# Format Plugin for PDF Table Reader
+One of the most annoying tasks is when you are working on a data science project and you get data that is in a PDF file. This plugin endeavours to enable you to query data in
+ PDF tables using Drill's SQL interface.  
+
+## Data Model
+Since PDF files generally are not intended to be queried or read by machines, mapping the data to tables and rows is not a perfect process.  The PDF reader does support 
+provided schema. 
+
+### Merging Pages
+The PDF reader reads tables from PDF files on each page.  If your PDF file has tables that span multiple pages, you can set the `combinePages` parameter to `true` and Drill 
+will merge all the tables in the PDF file.  You can also do this at query time with the `table()` function.
+
+## Configuration
+To configure the PDF reader, simply add the information below to the `formats` section of a file base storage plugin. 
+
+```json
+"pdf": {
+  "type": "pdf",
+  "extensions": [
+    "pdf"
+  ],
+  "extractHeaders": true,
+  "combinePages": false
+}
+```
+The avaialable options are:
+* `extractHeaders`: Extracts the first row of any tables as the header row.  If set to false, Drill will assign column names of `field_0`, `field_1` to each column.
+* `combinePages`: Merges multipage tables together.
+* `defaultTableIndex`:  Allows you to query different tables within the PDF file. Index starts at `0`. 

Review comment:
       It should start with 1: makes it easier for humans to count.

##########
File path: contrib/format-pdf/src/main/java/org/apache/drill/exec/store/pdf/PdfBatchReader.java
##########
@@ -0,0 +1,549 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.pdf;
+
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.shaded.guava.com.google.common.base.Strings;
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.record.metadata.MetadataUtils;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.exec.vector.accessor.TupleWriter;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.pdfbox.pdmodel.PDDocument;
+import org.apache.pdfbox.pdmodel.PDDocumentInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import technology.tabula.RectangularTextContainer;
+import technology.tabula.Table;
+
+import java.io.InputStream;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.List;
+
+public class PdfBatchReader implements ManagedReader<FileScanFramework.FileSchemaNegotiator> {
+
+  private static final Logger logger = LoggerFactory.getLogger(PdfBatchReader.class);
+  private static final String NEW_FIELD_PREFIX = "field_";
+  private final int maxRecords;
+
+  private final List<PdfColumnWriter> writers;
+  private FileSplit split;
+  private CustomErrorContext errorContext;
+  private RowSetLoader rowWriter;
+  private InputStream fsStream;
+  private PDDocument document;
+  private PdfReaderConfig config;
+
+  private SchemaBuilder builder;
+  private List<String> columnHeaders;
+  private int currentRowIndex;
+  private Table currentTable;
+  private int currentTableIndex;
+  private int startingTableIndex;
+  private FileScanFramework.FileSchemaNegotiator negotiator;
+
+  // Document Metadata Fields
+  private int pageCount;
+  private String title;
+  private String author;
+  private String subject;
+  private String keywords;
+  private String creator;
+  private String producer;
+  private Calendar creationDate;
+  private Calendar modificationDate;
+  private String trapped;
+  private int unregisteredColumnCount;
+  private int columns;
+  private int tableCount;
+  private int rows;
+  private int metadataIndex;
+
+
+  // Tables
+  private List<Table> tables;
+
+
+  static class PdfReaderConfig {
+    final PdfFormatPlugin plugin;
+
+    PdfReaderConfig(PdfFormatPlugin plugin) {
+      this.plugin = plugin;
+    }
+  }
+
+  public PdfBatchReader(PdfReaderConfig readerConfig, int maxRecords) {
+    this.maxRecords = maxRecords;
+    this.unregisteredColumnCount = 0;
+    this.writers = new ArrayList<>();
+    this.config = readerConfig;
+    this.startingTableIndex = readerConfig.plugin.getConfig().getDefaultTableIndex() < 0 ? 0 : readerConfig.plugin.getConfig().getDefaultTableIndex();
+    this.currentTableIndex = this.startingTableIndex;
+    this.columnHeaders = new ArrayList<>();
+  }
+
+  @Override
+  public boolean open(FileScanFramework.FileSchemaNegotiator negotiator) {
+    System.setProperty("java.awt.headless", "true");
+    this.negotiator = negotiator;
+
+    split = negotiator.split();
+    errorContext = negotiator.parentErrorContext();
+    builder = new SchemaBuilder();
+
+    openFile();
+
+    // Get the tables
+    tables = Utils.extractTablesFromPDF(document);
+    populateMetadata();
+    logger.debug("Found {} tables", tables.size());
+
+    // Support provided schema
+    TupleMetadata schema = null;
+    if (this.negotiator.hasProvidedSchema()) {
+      schema = this.negotiator.providedSchema();
+      this.negotiator.tableSchema(schema, false);
+    } else {
+      this.negotiator.tableSchema(buildSchema(), false);
+    }
+    ResultSetLoader loader = this.negotiator.build();
+    rowWriter = loader.writer();
+
+    if (negotiator.hasProvidedSchema()) {
+      buildWriterListFromProvidedSchema(schema);
+    } else {
+      buildWriterList();
+    }
+    addImplicitColumnsToSchema();

Review comment:
       I'd think you'd want to do this before creating the writer. Add columns to the schema before creating the vectors so that we get a simple picture before we start grinding the vector details.

##########
File path: contrib/format-pdf/README.md
##########
@@ -0,0 +1,67 @@
+# Format Plugin for PDF Table Reader
+One of the most annoying tasks is when you are working on a data science project and you get data that is in a PDF file. This plugin endeavours to enable you to query data in
+ PDF tables using Drill's SQL interface.  
+
+## Data Model
+Since PDF files generally are not intended to be queried or read by machines, mapping the data to tables and rows is not a perfect process.  The PDF reader does support 
+provided schema. 
+
+### Merging Pages
+The PDF reader reads tables from PDF files on each page.  If your PDF file has tables that span multiple pages, you can set the `combinePages` parameter to `true` and Drill 
+will merge all the tables in the PDF file.  You can also do this at query time with the `table()` function.
+
+## Configuration
+To configure the PDF reader, simply add the information below to the `formats` section of a file base storage plugin. 
+
+```json
+"pdf": {
+  "type": "pdf",
+  "extensions": [
+    "pdf"
+  ],
+  "extractHeaders": true,
+  "combinePages": false
+}
+```
+The avaialable options are:
+* `extractHeaders`: Extracts the first row of any tables as the header row.  If set to false, Drill will assign column names of `field_0`, `field_1` to each column.

Review comment:
       Explain a bit more. I have two tables with titles "a | b", and "c | d | e". Do I get a union row with "a, b, c, d, e" as fields? Or, just "a, b" as fields? When I read the second table, do I then get "a, b, e"?

##########
File path: contrib/format-pdf/README.md
##########
@@ -0,0 +1,67 @@
+# Format Plugin for PDF Table Reader
+One of the most annoying tasks is when you are working on a data science project and you get data that is in a PDF file. This plugin endeavours to enable you to query data in
+ PDF tables using Drill's SQL interface.  
+
+## Data Model
+Since PDF files generally are not intended to be queried or read by machines, mapping the data to tables and rows is not a perfect process.  The PDF reader does support 
+provided schema. 
+
+### Merging Pages
+The PDF reader reads tables from PDF files on each page.  If your PDF file has tables that span multiple pages, you can set the `combinePages` parameter to `true` and Drill 
+will merge all the tables in the PDF file.  You can also do this at query time with the `table()` function.
+
+## Configuration
+To configure the PDF reader, simply add the information below to the `formats` section of a file base storage plugin. 
+
+```json
+"pdf": {
+  "type": "pdf",
+  "extensions": [
+    "pdf"
+  ],
+  "extractHeaders": true,
+  "combinePages": false
+}
+```
+The avaialable options are:
+* `extractHeaders`: Extracts the first row of any tables as the header row.  If set to false, Drill will assign column names of `field_0`, `field_1` to each column.
+* `combinePages`: Merges multipage tables together.
+* `defaultTableIndex`:  Allows you to query different tables within the PDF file. Index starts at `0`. 
+
+
+## Accessing Document Metadata Fields
+PDF files have a considerable amount of metadata which can be useful for analysis.  Drill will extract the following fields from every PDF file.  Note that these fields are not
+ projected in star queries and must be selected explicitly.  The document's creator populates these fields and some or all may be empty. With the exception of `_page_count
+ ` which is an `INT` and the two date fields, all the other fields are `VARCHAR` fields.
+ 
+ The fields are:
+ * `_page_count`
+ * `_author`
+ * `_title`
+ * `_keywords`
+ * `_creator`
+ * `_producer`
+ * `_creation_date`
+ * `_modification_date`
+ * `_trapped`
+ * `_table_count`

Review comment:
       Here, I assume `_author` will repeat for every table row? That `_page_count` will increase and gives the page number? Or, is constant and will always be, say, 25 if that's the number of pages?
   
   Does `_table_count` stay fixed at the number of tables (5, say), or does it count tables (1, 2, 3, 4, 5)?
   
   What is `_trapped`? Is there some PDF standard we could point to where this stuff is defined?
   
   Actually, there is a larger question. If I'm exploring a pile of PDFs, I may want to extract metadata, such as the above. If I'm extracting tables, I don't need the metadata repeated per table row.
   
   Should the plugin allow two query types? Either metadata or tables? Can this be done with some kind of Calcite trickery? `FROM "foo.pdf"` means the tables, `FROM "foo.pdf.metadata"` means the metadata?
   
   Or, we check and if we see only metadata columns, we return one row per file rather than one row per table row?
   
   Wouldn't something like this be useful to extract metadata from a PDF with no tables?

##########
File path: contrib/format-pdf/src/main/java/org/apache/drill/exec/store/pdf/PdfBatchReader.java
##########
@@ -0,0 +1,549 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.pdf;
+
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.shaded.guava.com.google.common.base.Strings;
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.record.metadata.MetadataUtils;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.exec.vector.accessor.TupleWriter;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.pdfbox.pdmodel.PDDocument;
+import org.apache.pdfbox.pdmodel.PDDocumentInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import technology.tabula.RectangularTextContainer;
+import technology.tabula.Table;
+
+import java.io.InputStream;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.List;
+
+public class PdfBatchReader implements ManagedReader<FileScanFramework.FileSchemaNegotiator> {
+
+  private static final Logger logger = LoggerFactory.getLogger(PdfBatchReader.class);
+  private static final String NEW_FIELD_PREFIX = "field_";
+  private final int maxRecords;
+
+  private final List<PdfColumnWriter> writers;
+  private FileSplit split;
+  private CustomErrorContext errorContext;
+  private RowSetLoader rowWriter;
+  private InputStream fsStream;
+  private PDDocument document;
+  private PdfReaderConfig config;
+
+  private SchemaBuilder builder;
+  private List<String> columnHeaders;
+  private int currentRowIndex;
+  private Table currentTable;
+  private int currentTableIndex;
+  private int startingTableIndex;
+  private FileScanFramework.FileSchemaNegotiator negotiator;
+
+  // Document Metadata Fields
+  private int pageCount;
+  private String title;
+  private String author;
+  private String subject;
+  private String keywords;
+  private String creator;
+  private String producer;
+  private Calendar creationDate;
+  private Calendar modificationDate;
+  private String trapped;
+  private int unregisteredColumnCount;
+  private int columns;
+  private int tableCount;
+  private int rows;
+  private int metadataIndex;
+
+
+  // Tables
+  private List<Table> tables;
+
+
+  static class PdfReaderConfig {
+    final PdfFormatPlugin plugin;
+
+    PdfReaderConfig(PdfFormatPlugin plugin) {
+      this.plugin = plugin;
+    }
+  }
+
+  public PdfBatchReader(PdfReaderConfig readerConfig, int maxRecords) {
+    this.maxRecords = maxRecords;
+    this.unregisteredColumnCount = 0;
+    this.writers = new ArrayList<>();
+    this.config = readerConfig;
+    this.startingTableIndex = readerConfig.plugin.getConfig().getDefaultTableIndex() < 0 ? 0 : readerConfig.plugin.getConfig().getDefaultTableIndex();
+    this.currentTableIndex = this.startingTableIndex;
+    this.columnHeaders = new ArrayList<>();
+  }
+
+  @Override
+  public boolean open(FileScanFramework.FileSchemaNegotiator negotiator) {
+    System.setProperty("java.awt.headless", "true");
+    this.negotiator = negotiator;
+
+    split = negotiator.split();
+    errorContext = negotiator.parentErrorContext();
+    builder = new SchemaBuilder();
+
+    openFile();
+
+    // Get the tables
+    tables = Utils.extractTablesFromPDF(document);
+    populateMetadata();
+    logger.debug("Found {} tables", tables.size());
+
+    // Support provided schema
+    TupleMetadata schema = null;
+    if (this.negotiator.hasProvidedSchema()) {
+      schema = this.negotiator.providedSchema();
+      this.negotiator.tableSchema(schema, false);
+    } else {
+      this.negotiator.tableSchema(buildSchema(), false);
+    }
+    ResultSetLoader loader = this.negotiator.build();
+    rowWriter = loader.writer();
+
+    if (negotiator.hasProvidedSchema()) {
+      buildWriterListFromProvidedSchema(schema);
+    } else {
+      buildWriterList();
+    }
+    addImplicitColumnsToSchema();
+
+    // Prepare for reading
+    currentRowIndex = 1;  // Skip the first line if there are headers
+    currentTable = tables.get(startingTableIndex);
+
+    return true;
+  }
+
+  @Override
+  public boolean next() {
+    System.setProperty("java.awt.headless", "true");
+
+    while(!rowWriter.isFull()) {
+      // Check to see if the limit has been reached
+      if (rowWriter.limitReached(maxRecords)) {

Review comment:
       Is this for `LIMIT` pushdown? If so, I'd suggest we add it to the RSL itself as a new parameter. That way, the RSL knows to declare EOF and stop reading from the input once the limit is reached. It can also ignore any rows past the limit.

##########
File path: contrib/format-pdf/README.md
##########
@@ -0,0 +1,67 @@
+# Format Plugin for PDF Table Reader
+One of the most annoying tasks is when you are working on a data science project and you get data that is in a PDF file. This plugin endeavours to enable you to query data in
+ PDF tables using Drill's SQL interface.  
+
+## Data Model
+Since PDF files generally are not intended to be queried or read by machines, mapping the data to tables and rows is not a perfect process.  The PDF reader does support 
+provided schema. 
+
+### Merging Pages
+The PDF reader reads tables from PDF files on each page.  If your PDF file has tables that span multiple pages, you can set the `combinePages` parameter to `true` and Drill 
+will merge all the tables in the PDF file.  You can also do this at query time with the `table()` function.

Review comment:
       Suppose my file has three, mutually incompatible tables, and I only want to read the first. Can I? If so, how?
   
   Or, do I read all of them (sales of apples by state, shelf life of various apple kinds, list of largest apple growers) into a big messy, combined table, then use a `WHERE` clause to try to keep just the shelf life info?

##########
File path: contrib/format-pdf/src/main/java/org/apache/drill/exec/store/pdf/PdfBatchReader.java
##########
@@ -0,0 +1,549 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.pdf;
+
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.shaded.guava.com.google.common.base.Strings;
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.record.metadata.MetadataUtils;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.exec.vector.accessor.TupleWriter;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.pdfbox.pdmodel.PDDocument;
+import org.apache.pdfbox.pdmodel.PDDocumentInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import technology.tabula.RectangularTextContainer;
+import technology.tabula.Table;
+
+import java.io.InputStream;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.List;
+
+public class PdfBatchReader implements ManagedReader<FileScanFramework.FileSchemaNegotiator> {
+
+  private static final Logger logger = LoggerFactory.getLogger(PdfBatchReader.class);
+  private static final String NEW_FIELD_PREFIX = "field_";
+  private final int maxRecords;
+
+  private final List<PdfColumnWriter> writers;
+  private FileSplit split;
+  private CustomErrorContext errorContext;
+  private RowSetLoader rowWriter;
+  private InputStream fsStream;
+  private PDDocument document;
+  private PdfReaderConfig config;
+
+  private SchemaBuilder builder;
+  private List<String> columnHeaders;
+  private int currentRowIndex;
+  private Table currentTable;
+  private int currentTableIndex;
+  private int startingTableIndex;
+  private FileScanFramework.FileSchemaNegotiator negotiator;
+
+  // Document Metadata Fields
+  private int pageCount;
+  private String title;
+  private String author;
+  private String subject;
+  private String keywords;
+  private String creator;
+  private String producer;
+  private Calendar creationDate;
+  private Calendar modificationDate;
+  private String trapped;
+  private int unregisteredColumnCount;
+  private int columns;
+  private int tableCount;
+  private int rows;
+  private int metadataIndex;
+
+
+  // Tables
+  private List<Table> tables;
+
+
+  static class PdfReaderConfig {
+    final PdfFormatPlugin plugin;
+
+    PdfReaderConfig(PdfFormatPlugin plugin) {
+      this.plugin = plugin;
+    }
+  }
+
+  public PdfBatchReader(PdfReaderConfig readerConfig, int maxRecords) {
+    this.maxRecords = maxRecords;
+    this.unregisteredColumnCount = 0;
+    this.writers = new ArrayList<>();
+    this.config = readerConfig;
+    this.startingTableIndex = readerConfig.plugin.getConfig().getDefaultTableIndex() < 0 ? 0 : readerConfig.plugin.getConfig().getDefaultTableIndex();
+    this.currentTableIndex = this.startingTableIndex;
+    this.columnHeaders = new ArrayList<>();
+  }
+
+  @Override
+  public boolean open(FileScanFramework.FileSchemaNegotiator negotiator) {
+    System.setProperty("java.awt.headless", "true");
+    this.negotiator = negotiator;
+
+    split = negotiator.split();
+    errorContext = negotiator.parentErrorContext();
+    builder = new SchemaBuilder();
+
+    openFile();
+
+    // Get the tables
+    tables = Utils.extractTablesFromPDF(document);
+    populateMetadata();
+    logger.debug("Found {} tables", tables.size());
+
+    // Support provided schema
+    TupleMetadata schema = null;
+    if (this.negotiator.hasProvidedSchema()) {
+      schema = this.negotiator.providedSchema();
+      this.negotiator.tableSchema(schema, false);
+    } else {
+      this.negotiator.tableSchema(buildSchema(), false);
+    }
+    ResultSetLoader loader = this.negotiator.build();
+    rowWriter = loader.writer();
+
+    if (negotiator.hasProvidedSchema()) {
+      buildWriterListFromProvidedSchema(schema);
+    } else {
+      buildWriterList();
+    }
+    addImplicitColumnsToSchema();
+
+    // Prepare for reading
+    currentRowIndex = 1;  // Skip the first line if there are headers
+    currentTable = tables.get(startingTableIndex);
+
+    return true;
+  }
+
+  @Override
+  public boolean next() {
+    System.setProperty("java.awt.headless", "true");

Review comment:
       See comment above. We shouldn't be doing this!

##########
File path: contrib/format-pdf/README.md
##########
@@ -0,0 +1,67 @@
+# Format Plugin for PDF Table Reader
+One of the most annoying tasks is when you are working on a data science project and you get data that is in a PDF file. This plugin endeavours to enable you to query data in
+ PDF tables using Drill's SQL interface.  
+
+## Data Model
+Since PDF files generally are not intended to be queried or read by machines, mapping the data to tables and rows is not a perfect process.  The PDF reader does support 
+provided schema. 
+
+### Merging Pages
+The PDF reader reads tables from PDF files on each page.  If your PDF file has tables that span multiple pages, you can set the `combinePages` parameter to `true` and Drill 
+will merge all the tables in the PDF file.  You can also do this at query time with the `table()` function.
+
+## Configuration
+To configure the PDF reader, simply add the information below to the `formats` section of a file base storage plugin. 
+
+```json
+"pdf": {
+  "type": "pdf",
+  "extensions": [
+    "pdf"
+  ],
+  "extractHeaders": true,
+  "combinePages": false
+}
+```
+The avaialable options are:
+* `extractHeaders`: Extracts the first row of any tables as the header row.  If set to false, Drill will assign column names of `field_0`, `field_1` to each column.
+* `combinePages`: Merges multipage tables together.
+* `defaultTableIndex`:  Allows you to query different tables within the PDF file. Index starts at `0`. 
+
+
+## Accessing Document Metadata Fields
+PDF files have a considerable amount of metadata which can be useful for analysis.  Drill will extract the following fields from every PDF file.  Note that these fields are not
+ projected in star queries and must be selected explicitly.  The document's creator populates these fields and some or all may be empty. With the exception of `_page_count
+ ` which is an `INT` and the two date fields, all the other fields are `VARCHAR` fields.

Review comment:
       Nit: I think the newline gets included in the quoted bit.

##########
File path: contrib/format-pdf/src/main/java/org/apache/drill/exec/store/pdf/PdfBatchReader.java
##########
@@ -0,0 +1,549 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.pdf;
+
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.shaded.guava.com.google.common.base.Strings;
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.record.metadata.MetadataUtils;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.exec.vector.accessor.TupleWriter;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.pdfbox.pdmodel.PDDocument;
+import org.apache.pdfbox.pdmodel.PDDocumentInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import technology.tabula.RectangularTextContainer;
+import technology.tabula.Table;
+
+import java.io.InputStream;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.List;
+
+public class PdfBatchReader implements ManagedReader<FileScanFramework.FileSchemaNegotiator> {
+
+  private static final Logger logger = LoggerFactory.getLogger(PdfBatchReader.class);
+  private static final String NEW_FIELD_PREFIX = "field_";
+  private final int maxRecords;
+
+  private final List<PdfColumnWriter> writers;
+  private FileSplit split;
+  private CustomErrorContext errorContext;
+  private RowSetLoader rowWriter;
+  private InputStream fsStream;
+  private PDDocument document;
+  private PdfReaderConfig config;
+
+  private SchemaBuilder builder;
+  private List<String> columnHeaders;
+  private int currentRowIndex;
+  private Table currentTable;
+  private int currentTableIndex;
+  private int startingTableIndex;
+  private FileScanFramework.FileSchemaNegotiator negotiator;
+
+  // Document Metadata Fields
+  private int pageCount;
+  private String title;
+  private String author;
+  private String subject;
+  private String keywords;
+  private String creator;
+  private String producer;
+  private Calendar creationDate;
+  private Calendar modificationDate;
+  private String trapped;
+  private int unregisteredColumnCount;
+  private int columns;
+  private int tableCount;
+  private int rows;
+  private int metadataIndex;
+
+
+  // Tables
+  private List<Table> tables;
+
+
+  static class PdfReaderConfig {
+    final PdfFormatPlugin plugin;
+
+    PdfReaderConfig(PdfFormatPlugin plugin) {
+      this.plugin = plugin;
+    }
+  }
+
+  public PdfBatchReader(PdfReaderConfig readerConfig, int maxRecords) {
+    this.maxRecords = maxRecords;
+    this.unregisteredColumnCount = 0;
+    this.writers = new ArrayList<>();
+    this.config = readerConfig;
+    this.startingTableIndex = readerConfig.plugin.getConfig().getDefaultTableIndex() < 0 ? 0 : readerConfig.plugin.getConfig().getDefaultTableIndex();
+    this.currentTableIndex = this.startingTableIndex;
+    this.columnHeaders = new ArrayList<>();
+  }
+
+  @Override
+  public boolean open(FileScanFramework.FileSchemaNegotiator negotiator) {
+    System.setProperty("java.awt.headless", "true");
+    this.negotiator = negotiator;
+
+    split = negotiator.split();
+    errorContext = negotiator.parentErrorContext();
+    builder = new SchemaBuilder();
+
+    openFile();
+
+    // Get the tables
+    tables = Utils.extractTablesFromPDF(document);
+    populateMetadata();
+    logger.debug("Found {} tables", tables.size());
+
+    // Support provided schema
+    TupleMetadata schema = null;
+    if (this.negotiator.hasProvidedSchema()) {
+      schema = this.negotiator.providedSchema();
+      this.negotiator.tableSchema(schema, false);
+    } else {
+      this.negotiator.tableSchema(buildSchema(), false);
+    }
+    ResultSetLoader loader = this.negotiator.build();
+    rowWriter = loader.writer();
+
+    if (negotiator.hasProvidedSchema()) {
+      buildWriterListFromProvidedSchema(schema);
+    } else {
+      buildWriterList();
+    }
+    addImplicitColumnsToSchema();
+
+    // Prepare for reading
+    currentRowIndex = 1;  // Skip the first line if there are headers
+    currentTable = tables.get(startingTableIndex);
+
+    return true;
+  }
+
+  @Override
+  public boolean next() {
+    System.setProperty("java.awt.headless", "true");
+
+    while(!rowWriter.isFull()) {
+      // Check to see if the limit has been reached
+      if (rowWriter.limitReached(maxRecords)) {
+        return false;
+      } else if (currentRowIndex >= currentTable.getRows().size() &&
+                  currentTableIndex < tables.size() &&
+                  config.plugin.getConfig().getCombinePages()) {
+        currentRowIndex = 0;
+        currentTable = tables.get(currentTableIndex++);
+      } else if (currentRowIndex >= currentTable.getRows().size()) {
+        return false;
+      }
+
+      // Process the row
+      processRow(currentTable.getRows().get(currentRowIndex));
+      currentRowIndex++;
+    }
+    return true;
+  }
+
+  private void processRow(List<RectangularTextContainer> row) {
+    if (row == null || row.size() == 0) {
+      return;
+    }
+
+    String value;
+    rowWriter.start();
+    for (int i = 0; i < row.size(); i++) {
+      value = row.get(i).getText();
+
+      if (Strings.isNullOrEmpty(value)) {
+        continue;
+      }
+      writers.get(i).load(row.get(i));
+    }
+    writeMetadata();
+    rowWriter.save();
+  }
+
+  @Override
+  public void close() {
+    if (fsStream != null) {
+      AutoCloseables.closeSilently(fsStream);
+      fsStream = null;
+    }
+
+    if (document != null) {
+      AutoCloseables.closeSilently(document.getDocument());
+      AutoCloseables.closeSilently(document);
+      document = null;
+    }
+  }
+
+  /**
+   * This method opens the PDF file, and finds the tables
+   */
+  private void openFile() {
+    try {
+      fsStream = negotiator.fileSystem().openPossiblyCompressedStream(split.getPath());
+      document = PDDocument.load(fsStream);
+    } catch (Exception e) {
+      throw UserException
+        .dataReadError(e)
+        .message("Failed to open open input file: %s", split.getPath().toString())
+        .addContext(e.getMessage())
+        .addContext(errorContext)
+        .build(logger);
+    }
+  }
+
+  /**
+   * Metadata fields are calculated once when the file is opened.  This function populates
+   * the metadata fields so that these are only calculated once.
+   */
+  private void populateMetadata() {
+    PDDocumentInformation info = document.getDocumentInformation();
+    pageCount = document.getNumberOfPages();
+    title = info.getTitle();
+    author = info.getAuthor();
+    subject = info.getSubject();
+    keywords = info.getKeywords();
+    creator = info.getCreator();
+    producer = info.getProducer();
+    creationDate = info.getCreationDate();
+    modificationDate = info.getModificationDate();
+    trapped = info.getTrapped();
+    tableCount = tables.size();

Review comment:
       Suggestion: create a `Metadata` class to hold this stuff rather than adding it to the base reader. More modular.

##########
File path: contrib/format-pdf/src/main/java/org/apache/drill/exec/store/pdf/PdfBatchReader.java
##########
@@ -0,0 +1,549 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.pdf;
+
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.shaded.guava.com.google.common.base.Strings;
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.record.metadata.MetadataUtils;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.exec.vector.accessor.TupleWriter;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.pdfbox.pdmodel.PDDocument;
+import org.apache.pdfbox.pdmodel.PDDocumentInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import technology.tabula.RectangularTextContainer;
+import technology.tabula.Table;
+
+import java.io.InputStream;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.List;
+
+public class PdfBatchReader implements ManagedReader<FileScanFramework.FileSchemaNegotiator> {
+
+  private static final Logger logger = LoggerFactory.getLogger(PdfBatchReader.class);
+  private static final String NEW_FIELD_PREFIX = "field_";
+  private final int maxRecords;
+
+  private final List<PdfColumnWriter> writers;
+  private FileSplit split;
+  private CustomErrorContext errorContext;
+  private RowSetLoader rowWriter;
+  private InputStream fsStream;
+  private PDDocument document;
+  private PdfReaderConfig config;
+
+  private SchemaBuilder builder;
+  private List<String> columnHeaders;
+  private int currentRowIndex;
+  private Table currentTable;
+  private int currentTableIndex;
+  private int startingTableIndex;
+  private FileScanFramework.FileSchemaNegotiator negotiator;
+
+  // Document Metadata Fields
+  private int pageCount;
+  private String title;
+  private String author;
+  private String subject;
+  private String keywords;
+  private String creator;
+  private String producer;
+  private Calendar creationDate;
+  private Calendar modificationDate;
+  private String trapped;
+  private int unregisteredColumnCount;
+  private int columns;
+  private int tableCount;
+  private int rows;
+  private int metadataIndex;
+
+
+  // Tables
+  private List<Table> tables;
+
+
+  static class PdfReaderConfig {
+    final PdfFormatPlugin plugin;
+
+    PdfReaderConfig(PdfFormatPlugin plugin) {
+      this.plugin = plugin;
+    }
+  }
+
+  public PdfBatchReader(PdfReaderConfig readerConfig, int maxRecords) {
+    this.maxRecords = maxRecords;
+    this.unregisteredColumnCount = 0;
+    this.writers = new ArrayList<>();
+    this.config = readerConfig;
+    this.startingTableIndex = readerConfig.plugin.getConfig().getDefaultTableIndex() < 0 ? 0 : readerConfig.plugin.getConfig().getDefaultTableIndex();
+    this.currentTableIndex = this.startingTableIndex;
+    this.columnHeaders = new ArrayList<>();
+  }
+
+  @Override
+  public boolean open(FileScanFramework.FileSchemaNegotiator negotiator) {
+    System.setProperty("java.awt.headless", "true");
+    this.negotiator = negotiator;
+
+    split = negotiator.split();
+    errorContext = negotiator.parentErrorContext();
+    builder = new SchemaBuilder();
+
+    openFile();
+
+    // Get the tables
+    tables = Utils.extractTablesFromPDF(document);

Review comment:
       Does this scale? Get all data on open, cached in memory? In each of a dozen or more PDF readers in the same process? (This is why having this kind of plugin in the core is problematic: won't scale for PDFs with large numbers of tables.)

##########
File path: contrib/format-pdf/src/main/java/org/apache/drill/exec/store/pdf/PdfBatchReader.java
##########
@@ -0,0 +1,549 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.pdf;
+
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.shaded.guava.com.google.common.base.Strings;
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.record.metadata.MetadataUtils;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.exec.vector.accessor.TupleWriter;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.pdfbox.pdmodel.PDDocument;
+import org.apache.pdfbox.pdmodel.PDDocumentInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import technology.tabula.RectangularTextContainer;
+import technology.tabula.Table;
+
+import java.io.InputStream;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.List;
+
+public class PdfBatchReader implements ManagedReader<FileScanFramework.FileSchemaNegotiator> {
+
+  private static final Logger logger = LoggerFactory.getLogger(PdfBatchReader.class);
+  private static final String NEW_FIELD_PREFIX = "field_";
+  private final int maxRecords;
+
+  private final List<PdfColumnWriter> writers;
+  private FileSplit split;
+  private CustomErrorContext errorContext;
+  private RowSetLoader rowWriter;
+  private InputStream fsStream;
+  private PDDocument document;
+  private PdfReaderConfig config;
+
+  private SchemaBuilder builder;
+  private List<String> columnHeaders;
+  private int currentRowIndex;
+  private Table currentTable;
+  private int currentTableIndex;
+  private int startingTableIndex;
+  private FileScanFramework.FileSchemaNegotiator negotiator;
+
+  // Document Metadata Fields
+  private int pageCount;
+  private String title;
+  private String author;
+  private String subject;
+  private String keywords;
+  private String creator;
+  private String producer;
+  private Calendar creationDate;
+  private Calendar modificationDate;
+  private String trapped;
+  private int unregisteredColumnCount;
+  private int columns;
+  private int tableCount;
+  private int rows;
+  private int metadataIndex;
+
+
+  // Tables
+  private List<Table> tables;
+
+
+  static class PdfReaderConfig {
+    final PdfFormatPlugin plugin;
+
+    PdfReaderConfig(PdfFormatPlugin plugin) {
+      this.plugin = plugin;
+    }
+  }
+
+  public PdfBatchReader(PdfReaderConfig readerConfig, int maxRecords) {
+    this.maxRecords = maxRecords;
+    this.unregisteredColumnCount = 0;
+    this.writers = new ArrayList<>();
+    this.config = readerConfig;
+    this.startingTableIndex = readerConfig.plugin.getConfig().getDefaultTableIndex() < 0 ? 0 : readerConfig.plugin.getConfig().getDefaultTableIndex();
+    this.currentTableIndex = this.startingTableIndex;
+    this.columnHeaders = new ArrayList<>();
+  }
+
+  @Override
+  public boolean open(FileScanFramework.FileSchemaNegotiator negotiator) {
+    System.setProperty("java.awt.headless", "true");
+    this.negotiator = negotiator;
+
+    split = negotiator.split();
+    errorContext = negotiator.parentErrorContext();
+    builder = new SchemaBuilder();
+
+    openFile();
+
+    // Get the tables
+    tables = Utils.extractTablesFromPDF(document);
+    populateMetadata();
+    logger.debug("Found {} tables", tables.size());
+
+    // Support provided schema
+    TupleMetadata schema = null;
+    if (this.negotiator.hasProvidedSchema()) {
+      schema = this.negotiator.providedSchema();
+      this.negotiator.tableSchema(schema, false);
+    } else {
+      this.negotiator.tableSchema(buildSchema(), false);
+    }
+    ResultSetLoader loader = this.negotiator.build();
+    rowWriter = loader.writer();
+
+    if (negotiator.hasProvidedSchema()) {
+      buildWriterListFromProvidedSchema(schema);
+    } else {
+      buildWriterList();
+    }
+    addImplicitColumnsToSchema();
+
+    // Prepare for reading
+    currentRowIndex = 1;  // Skip the first line if there are headers
+    currentTable = tables.get(startingTableIndex);
+
+    return true;
+  }
+
+  @Override
+  public boolean next() {
+    System.setProperty("java.awt.headless", "true");
+
+    while(!rowWriter.isFull()) {
+      // Check to see if the limit has been reached
+      if (rowWriter.limitReached(maxRecords)) {
+        return false;
+      } else if (currentRowIndex >= currentTable.getRows().size() &&
+                  currentTableIndex < tables.size() &&
+                  config.plugin.getConfig().getCombinePages()) {
+        currentRowIndex = 0;
+        currentTable = tables.get(currentTableIndex++);
+      } else if (currentRowIndex >= currentTable.getRows().size()) {
+        return false;
+      }
+
+      // Process the row
+      processRow(currentTable.getRows().get(currentRowIndex));
+      currentRowIndex++;
+    }
+    return true;
+  }
+
+  private void processRow(List<RectangularTextContainer> row) {
+    if (row == null || row.size() == 0) {
+      return;
+    }
+
+    String value;
+    rowWriter.start();
+    for (int i = 0; i < row.size(); i++) {
+      value = row.get(i).getText();
+
+      if (Strings.isNullOrEmpty(value)) {
+        continue;
+      }
+      writers.get(i).load(row.get(i));
+    }
+    writeMetadata();
+    rowWriter.save();
+  }
+
+  @Override
+  public void close() {
+    if (fsStream != null) {
+      AutoCloseables.closeSilently(fsStream);
+      fsStream = null;
+    }
+
+    if (document != null) {
+      AutoCloseables.closeSilently(document.getDocument());
+      AutoCloseables.closeSilently(document);
+      document = null;
+    }
+  }
+
+  /**
+   * This method opens the PDF file, and finds the tables
+   */
+  private void openFile() {
+    try {
+      fsStream = negotiator.fileSystem().openPossiblyCompressedStream(split.getPath());
+      document = PDDocument.load(fsStream);
+    } catch (Exception e) {
+      throw UserException
+        .dataReadError(e)
+        .message("Failed to open open input file: %s", split.getPath().toString())
+        .addContext(e.getMessage())
+        .addContext(errorContext)
+        .build(logger);
+    }
+  }
+
+  /**
+   * Metadata fields are calculated once when the file is opened.  This function populates
+   * the metadata fields so that these are only calculated once.
+   */
+  private void populateMetadata() {
+    PDDocumentInformation info = document.getDocumentInformation();
+    pageCount = document.getNumberOfPages();
+    title = info.getTitle();
+    author = info.getAuthor();
+    subject = info.getSubject();
+    keywords = info.getKeywords();
+    creator = info.getCreator();
+    producer = info.getProducer();
+    creationDate = info.getCreationDate();
+    modificationDate = info.getModificationDate();
+    trapped = info.getTrapped();
+    tableCount = tables.size();
+  }
+
+  private void addImplicitColumnsToSchema() {
+    metadataIndex = columns;
+    // Add to schema
+    addMetadataColumnToSchema("_page_count", MinorType.INT);
+    addMetadataColumnToSchema("_title", MinorType.VARCHAR);
+    addMetadataColumnToSchema("_author", MinorType.VARCHAR);
+    addMetadataColumnToSchema("_subject", MinorType.VARCHAR);
+    addMetadataColumnToSchema("_keywords", MinorType.VARCHAR);
+    addMetadataColumnToSchema("_creator", MinorType.VARCHAR);
+    addMetadataColumnToSchema("_producer", MinorType.VARCHAR);
+    addMetadataColumnToSchema("_creation_date", MinorType.TIMESTAMP);
+    addMetadataColumnToSchema("_modification_date", MinorType.TIMESTAMP);
+    addMetadataColumnToSchema("_trapped", MinorType.VARCHAR);
+    addMetadataColumnToSchema("_table_count", MinorType.INT);
+  }
+
+  private void addMetadataColumnToSchema(String columnName, MinorType dataType) {
+    int index = rowWriter.tupleSchema().index(columnName);
+    if (index == -1) {
+      ColumnMetadata colSchema = MetadataUtils.newScalar(columnName, dataType, DataMode.OPTIONAL);
+
+      // Exclude from wildcard queries
+      colSchema.setBooleanProperty(ColumnMetadata.EXCLUDE_FROM_WILDCARD, true);
+      metadataIndex++;
+
+      index = rowWriter.addColumn(colSchema);
+    }
+
+    writers.add(new StringPdfColumnWriter(index, columnName, rowWriter));

Review comment:
       Again, want to do this while building the schema, no on the live writer.

##########
File path: contrib/format-pdf/src/main/java/org/apache/drill/exec/store/pdf/Utils.java
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.pdf;
+
+import org.apache.pdfbox.pdmodel.PDDocument;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import technology.tabula.ObjectExtractor;
+import technology.tabula.Page;
+import technology.tabula.PageIterator;
+import technology.tabula.Rectangle;
+import technology.tabula.RectangularTextContainer;
+import technology.tabula.Table;
+import technology.tabula.detectors.NurminenDetectionAlgorithm;
+import technology.tabula.extractors.BasicExtractionAlgorithm;
+import technology.tabula.extractors.ExtractionAlgorithm;
+import technology.tabula.extractors.SpreadsheetExtractionAlgorithm;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class Utils {
+
+  public static final ExtractionAlgorithm DEFAULT_ALGORITHM = new BasicExtractionAlgorithm();
+  private static final Logger logger = LoggerFactory.getLogger(Utils.class);
+
+  /**
+   * Returns a list of tables found in a given PDF document.  There are several extraction algorithms
+   * available and this function uses the default Basic Extraction Algorithm.
+   * @param document The input PDF document to search for tables
+   * @return A list of tables found in the document.
+   */
+  public static List<Table> extractTablesFromPDF(PDDocument document) {
+    return extractTablesFromPDF(document, DEFAULT_ALGORITHM);
+  }
+
+  public static List<Table> extractTablesFromPDF(PDDocument document, ExtractionAlgorithm algorithm) {
+    System.setProperty("java.awt.headless", "true");
+
+    NurminenDetectionAlgorithm detectionAlgorithm = new NurminenDetectionAlgorithm();
+
+    ExtractionAlgorithm algExtractor;
+
+    SpreadsheetExtractionAlgorithm extractor = new SpreadsheetExtractionAlgorithm();
+
+    ObjectExtractor objectExtractor = new ObjectExtractor(document);
+    PageIterator pages = objectExtractor.extract();
+    List<Table> tables= new ArrayList<>();

Review comment:
       Nit: space before `=`.

##########
File path: contrib/format-pdf/src/main/java/org/apache/drill/exec/store/pdf/PdfBatchReader.java
##########
@@ -0,0 +1,549 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.pdf;
+
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.shaded.guava.com.google.common.base.Strings;
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.record.metadata.MetadataUtils;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.exec.vector.accessor.TupleWriter;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.pdfbox.pdmodel.PDDocument;
+import org.apache.pdfbox.pdmodel.PDDocumentInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import technology.tabula.RectangularTextContainer;
+import technology.tabula.Table;
+
+import java.io.InputStream;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.List;
+
+public class PdfBatchReader implements ManagedReader<FileScanFramework.FileSchemaNegotiator> {
+
+  private static final Logger logger = LoggerFactory.getLogger(PdfBatchReader.class);
+  private static final String NEW_FIELD_PREFIX = "field_";
+  private final int maxRecords;
+
+  private final List<PdfColumnWriter> writers;
+  private FileSplit split;
+  private CustomErrorContext errorContext;
+  private RowSetLoader rowWriter;
+  private InputStream fsStream;
+  private PDDocument document;
+  private PdfReaderConfig config;
+
+  private SchemaBuilder builder;
+  private List<String> columnHeaders;
+  private int currentRowIndex;
+  private Table currentTable;
+  private int currentTableIndex;
+  private int startingTableIndex;
+  private FileScanFramework.FileSchemaNegotiator negotiator;
+
+  // Document Metadata Fields
+  private int pageCount;
+  private String title;
+  private String author;
+  private String subject;
+  private String keywords;
+  private String creator;
+  private String producer;
+  private Calendar creationDate;
+  private Calendar modificationDate;
+  private String trapped;
+  private int unregisteredColumnCount;
+  private int columns;
+  private int tableCount;
+  private int rows;
+  private int metadataIndex;
+
+
+  // Tables
+  private List<Table> tables;
+
+
+  static class PdfReaderConfig {
+    final PdfFormatPlugin plugin;
+
+    PdfReaderConfig(PdfFormatPlugin plugin) {
+      this.plugin = plugin;
+    }
+  }
+
+  public PdfBatchReader(PdfReaderConfig readerConfig, int maxRecords) {
+    this.maxRecords = maxRecords;
+    this.unregisteredColumnCount = 0;
+    this.writers = new ArrayList<>();
+    this.config = readerConfig;
+    this.startingTableIndex = readerConfig.plugin.getConfig().getDefaultTableIndex() < 0 ? 0 : readerConfig.plugin.getConfig().getDefaultTableIndex();
+    this.currentTableIndex = this.startingTableIndex;
+    this.columnHeaders = new ArrayList<>();
+  }
+
+  @Override
+  public boolean open(FileScanFramework.FileSchemaNegotiator negotiator) {
+    System.setProperty("java.awt.headless", "true");
+    this.negotiator = negotiator;
+
+    split = negotiator.split();
+    errorContext = negotiator.parentErrorContext();
+    builder = new SchemaBuilder();
+
+    openFile();
+
+    // Get the tables
+    tables = Utils.extractTablesFromPDF(document);
+    populateMetadata();
+    logger.debug("Found {} tables", tables.size());
+
+    // Support provided schema
+    TupleMetadata schema = null;
+    if (this.negotiator.hasProvidedSchema()) {
+      schema = this.negotiator.providedSchema();
+      this.negotiator.tableSchema(schema, false);
+    } else {
+      this.negotiator.tableSchema(buildSchema(), false);
+    }
+    ResultSetLoader loader = this.negotiator.build();
+    rowWriter = loader.writer();
+
+    if (negotiator.hasProvidedSchema()) {
+      buildWriterListFromProvidedSchema(schema);
+    } else {
+      buildWriterList();
+    }
+    addImplicitColumnsToSchema();
+
+    // Prepare for reading
+    currentRowIndex = 1;  // Skip the first line if there are headers
+    currentTable = tables.get(startingTableIndex);
+
+    return true;
+  }
+
+  @Override
+  public boolean next() {
+    System.setProperty("java.awt.headless", "true");
+
+    while(!rowWriter.isFull()) {
+      // Check to see if the limit has been reached
+      if (rowWriter.limitReached(maxRecords)) {
+        return false;
+      } else if (currentRowIndex >= currentTable.getRows().size() &&
+                  currentTableIndex < tables.size() &&
+                  config.plugin.getConfig().getCombinePages()) {
+        currentRowIndex = 0;
+        currentTable = tables.get(currentTableIndex++);
+      } else if (currentRowIndex >= currentTable.getRows().size()) {
+        return false;
+      }
+
+      // Process the row
+      processRow(currentTable.getRows().get(currentRowIndex));
+      currentRowIndex++;
+    }
+    return true;
+  }
+
+  private void processRow(List<RectangularTextContainer> row) {
+    if (row == null || row.size() == 0) {
+      return;
+    }
+
+    String value;
+    rowWriter.start();
+    for (int i = 0; i < row.size(); i++) {
+      value = row.get(i).getText();
+
+      if (Strings.isNullOrEmpty(value)) {
+        continue;
+      }
+      writers.get(i).load(row.get(i));
+    }
+    writeMetadata();
+    rowWriter.save();
+  }
+
+  @Override
+  public void close() {
+    if (fsStream != null) {

Review comment:
       Given that the file is read only in `openFile()`, no need to keep the file stream around: just close it in `openFile()`.

##########
File path: contrib/format-pdf/src/main/java/org/apache/drill/exec/store/pdf/PdfBatchReader.java
##########
@@ -0,0 +1,549 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.pdf;
+
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.shaded.guava.com.google.common.base.Strings;
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.record.metadata.MetadataUtils;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.exec.vector.accessor.TupleWriter;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.pdfbox.pdmodel.PDDocument;
+import org.apache.pdfbox.pdmodel.PDDocumentInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import technology.tabula.RectangularTextContainer;
+import technology.tabula.Table;
+
+import java.io.InputStream;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.List;
+
+public class PdfBatchReader implements ManagedReader<FileScanFramework.FileSchemaNegotiator> {
+
+  private static final Logger logger = LoggerFactory.getLogger(PdfBatchReader.class);
+  private static final String NEW_FIELD_PREFIX = "field_";
+  private final int maxRecords;
+
+  private final List<PdfColumnWriter> writers;
+  private FileSplit split;
+  private CustomErrorContext errorContext;
+  private RowSetLoader rowWriter;
+  private InputStream fsStream;
+  private PDDocument document;
+  private PdfReaderConfig config;
+
+  private SchemaBuilder builder;
+  private List<String> columnHeaders;
+  private int currentRowIndex;
+  private Table currentTable;
+  private int currentTableIndex;
+  private int startingTableIndex;
+  private FileScanFramework.FileSchemaNegotiator negotiator;
+
+  // Document Metadata Fields
+  private int pageCount;
+  private String title;
+  private String author;
+  private String subject;
+  private String keywords;
+  private String creator;
+  private String producer;
+  private Calendar creationDate;
+  private Calendar modificationDate;
+  private String trapped;
+  private int unregisteredColumnCount;
+  private int columns;
+  private int tableCount;
+  private int rows;
+  private int metadataIndex;
+
+
+  // Tables
+  private List<Table> tables;
+
+
+  static class PdfReaderConfig {
+    final PdfFormatPlugin plugin;
+
+    PdfReaderConfig(PdfFormatPlugin plugin) {
+      this.plugin = plugin;
+    }
+  }
+
+  public PdfBatchReader(PdfReaderConfig readerConfig, int maxRecords) {
+    this.maxRecords = maxRecords;
+    this.unregisteredColumnCount = 0;
+    this.writers = new ArrayList<>();
+    this.config = readerConfig;
+    this.startingTableIndex = readerConfig.plugin.getConfig().getDefaultTableIndex() < 0 ? 0 : readerConfig.plugin.getConfig().getDefaultTableIndex();
+    this.currentTableIndex = this.startingTableIndex;
+    this.columnHeaders = new ArrayList<>();
+  }
+
+  @Override
+  public boolean open(FileScanFramework.FileSchemaNegotiator negotiator) {
+    System.setProperty("java.awt.headless", "true");
+    this.negotiator = negotiator;
+
+    split = negotiator.split();
+    errorContext = negotiator.parentErrorContext();
+    builder = new SchemaBuilder();
+
+    openFile();
+
+    // Get the tables
+    tables = Utils.extractTablesFromPDF(document);
+    populateMetadata();
+    logger.debug("Found {} tables", tables.size());
+
+    // Support provided schema
+    TupleMetadata schema = null;
+    if (this.negotiator.hasProvidedSchema()) {
+      schema = this.negotiator.providedSchema();
+      this.negotiator.tableSchema(schema, false);
+    } else {
+      this.negotiator.tableSchema(buildSchema(), false);
+    }
+    ResultSetLoader loader = this.negotiator.build();
+    rowWriter = loader.writer();
+
+    if (negotiator.hasProvidedSchema()) {
+      buildWriterListFromProvidedSchema(schema);
+    } else {
+      buildWriterList();
+    }
+    addImplicitColumnsToSchema();
+
+    // Prepare for reading
+    currentRowIndex = 1;  // Skip the first line if there are headers
+    currentTable = tables.get(startingTableIndex);
+
+    return true;
+  }
+
+  @Override
+  public boolean next() {
+    System.setProperty("java.awt.headless", "true");
+
+    while(!rowWriter.isFull()) {
+      // Check to see if the limit has been reached
+      if (rowWriter.limitReached(maxRecords)) {
+        return false;
+      } else if (currentRowIndex >= currentTable.getRows().size() &&
+                  currentTableIndex < tables.size() &&
+                  config.plugin.getConfig().getCombinePages()) {
+        currentRowIndex = 0;
+        currentTable = tables.get(currentTableIndex++);
+      } else if (currentRowIndex >= currentTable.getRows().size()) {
+        return false;
+      }
+
+      // Process the row
+      processRow(currentTable.getRows().get(currentRowIndex));
+      currentRowIndex++;
+    }
+    return true;
+  }
+
+  private void processRow(List<RectangularTextContainer> row) {
+    if (row == null || row.size() == 0) {
+      return;
+    }
+
+    String value;
+    rowWriter.start();
+    for (int i = 0; i < row.size(); i++) {
+      value = row.get(i).getText();
+
+      if (Strings.isNullOrEmpty(value)) {
+        continue;
+      }
+      writers.get(i).load(row.get(i));
+    }
+    writeMetadata();
+    rowWriter.save();
+  }
+
+  @Override
+  public void close() {
+    if (fsStream != null) {
+      AutoCloseables.closeSilently(fsStream);
+      fsStream = null;
+    }
+
+    if (document != null) {
+      AutoCloseables.closeSilently(document.getDocument());
+      AutoCloseables.closeSilently(document);
+      document = null;
+    }
+  }
+
+  /**
+   * This method opens the PDF file, and finds the tables
+   */
+  private void openFile() {
+    try {
+      fsStream = negotiator.fileSystem().openPossiblyCompressedStream(split.getPath());
+      document = PDDocument.load(fsStream);
+    } catch (Exception e) {
+      throw UserException
+        .dataReadError(e)
+        .message("Failed to open open input file: %s", split.getPath().toString())
+        .addContext(e.getMessage())
+        .addContext(errorContext)
+        .build(logger);
+    }
+  }
+
+  /**
+   * Metadata fields are calculated once when the file is opened.  This function populates
+   * the metadata fields so that these are only calculated once.
+   */
+  private void populateMetadata() {
+    PDDocumentInformation info = document.getDocumentInformation();
+    pageCount = document.getNumberOfPages();
+    title = info.getTitle();
+    author = info.getAuthor();
+    subject = info.getSubject();
+    keywords = info.getKeywords();
+    creator = info.getCreator();
+    producer = info.getProducer();
+    creationDate = info.getCreationDate();
+    modificationDate = info.getModificationDate();
+    trapped = info.getTrapped();
+    tableCount = tables.size();
+  }
+
+  private void addImplicitColumnsToSchema() {
+    metadataIndex = columns;
+    // Add to schema
+    addMetadataColumnToSchema("_page_count", MinorType.INT);
+    addMetadataColumnToSchema("_title", MinorType.VARCHAR);
+    addMetadataColumnToSchema("_author", MinorType.VARCHAR);
+    addMetadataColumnToSchema("_subject", MinorType.VARCHAR);
+    addMetadataColumnToSchema("_keywords", MinorType.VARCHAR);
+    addMetadataColumnToSchema("_creator", MinorType.VARCHAR);
+    addMetadataColumnToSchema("_producer", MinorType.VARCHAR);
+    addMetadataColumnToSchema("_creation_date", MinorType.TIMESTAMP);
+    addMetadataColumnToSchema("_modification_date", MinorType.TIMESTAMP);
+    addMetadataColumnToSchema("_trapped", MinorType.VARCHAR);
+    addMetadataColumnToSchema("_table_count", MinorType.INT);
+  }
+
+  private void addMetadataColumnToSchema(String columnName, MinorType dataType) {
+    int index = rowWriter.tupleSchema().index(columnName);
+    if (index == -1) {
+      ColumnMetadata colSchema = MetadataUtils.newScalar(columnName, dataType, DataMode.OPTIONAL);
+
+      // Exclude from wildcard queries
+      colSchema.setBooleanProperty(ColumnMetadata.EXCLUDE_FROM_WILDCARD, true);
+      metadataIndex++;
+
+      index = rowWriter.addColumn(colSchema);
+    }
+
+    writers.add(new StringPdfColumnWriter(index, columnName, rowWriter));
+  }
+
+  private void writeMetadata() {
+    int startingIndex = columnHeaders.size();
+    writers.get(startingIndex).getWriter().setInt(pageCount);
+    writeStringMetadataField(title, startingIndex+1);
+    writeStringMetadataField(author, startingIndex+2);
+    writeStringMetadataField(subject, startingIndex+3);
+    writeStringMetadataField(keywords, startingIndex+4);
+    writeStringMetadataField(creator, startingIndex+5);
+    writeStringMetadataField(producer, startingIndex+6);
+    writeTimestampMetadataField(creationDate, startingIndex+7);
+    writeTimestampMetadataField(modificationDate, startingIndex+8);
+    writeStringMetadataField(trapped, startingIndex+9);
+    writers.get(startingIndex+10).getWriter().setInt(tableCount);
+  }
+
+  private void writeStringMetadataField(String value, int index) {
+    if (value == null) {
+      return;
+    }
+    writers.get(index).getWriter().setString(value);

Review comment:
       Cache the writer in the proposed Metadata class. Cache the values in an array, then, for speed:
   
   ```java
     for (int i = 0; i < columns.length; i++) {
       writers[i].setObject(columns[i]);
     }
   ```

##########
File path: contrib/format-pdf/src/main/java/org/apache/drill/exec/store/pdf/PdfFormatConfig.java
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.pdf;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonInclude.Include;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+import org.apache.drill.common.PlanStringBuilder;
+import org.apache.drill.common.logical.FormatPluginConfig;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+@JsonTypeName(PdfFormatPlugin.DEFAULT_NAME)
+public class PdfFormatConfig implements FormatPluginConfig {
+
+  private final List<String> extensions;
+  private final boolean extractHeaders;
+  private final String extractionAlgorithm;
+  private final boolean combinePages;
+  private final int defaultTableIndex;
+
+  @JsonCreator
+  public PdfFormatConfig(@JsonProperty("extensions") List<String> extensions,
+                         @JsonProperty("extractHeaders") boolean extractHeaders,
+                         @JsonProperty("extractionAlgorithm") String extractionAlgorithm,
+                         @JsonProperty("combinePages") boolean combinePages,
+                         @JsonProperty("defaultTableIndex") int defaultTableIndex) {
+    this.extensions = extensions == null
+      ? Collections.singletonList("pdf")
+      : ImmutableList.copyOf(extensions);
+    this.extractHeaders = extractHeaders;
+    this.extractionAlgorithm = extractionAlgorithm;
+    this.combinePages = combinePages;
+    this.defaultTableIndex = defaultTableIndex;
+  }
+
+  public PdfBatchReader.PdfReaderConfig getReaderConfig(PdfFormatPlugin plugin) {
+    return new PdfBatchReader.PdfReaderConfig(plugin);
+  }
+
+  @JsonInclude(Include.NON_DEFAULT)
+  @JsonProperty("extensions")
+  public List<String> getExtensions() {
+    return extensions;
+  }
+
+  @JsonInclude(Include.NON_DEFAULT)
+  @JsonProperty("combinePages")
+  public boolean getCombinePages() { return combinePages; }
+
+  @JsonInclude(Include.NON_DEFAULT)
+  @JsonProperty("extractHeaders")
+  public boolean getExtractHeaders() {
+    return extractHeaders;
+  }
+
+  @JsonProperty("extractionAlgorithm")

Review comment:
       `@JsonInclude(Include.NON_NULL)`?

##########
File path: contrib/format-pdf/src/main/java/org/apache/drill/exec/store/pdf/PdfBatchReader.java
##########
@@ -0,0 +1,549 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.pdf;
+
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.shaded.guava.com.google.common.base.Strings;
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.record.metadata.MetadataUtils;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.exec.vector.accessor.TupleWriter;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.pdfbox.pdmodel.PDDocument;
+import org.apache.pdfbox.pdmodel.PDDocumentInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import technology.tabula.RectangularTextContainer;
+import technology.tabula.Table;
+
+import java.io.InputStream;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.List;
+
+public class PdfBatchReader implements ManagedReader<FileScanFramework.FileSchemaNegotiator> {
+
+  private static final Logger logger = LoggerFactory.getLogger(PdfBatchReader.class);
+  private static final String NEW_FIELD_PREFIX = "field_";
+  private final int maxRecords;
+
+  private final List<PdfColumnWriter> writers;
+  private FileSplit split;
+  private CustomErrorContext errorContext;
+  private RowSetLoader rowWriter;
+  private InputStream fsStream;
+  private PDDocument document;
+  private PdfReaderConfig config;
+
+  private SchemaBuilder builder;
+  private List<String> columnHeaders;
+  private int currentRowIndex;
+  private Table currentTable;
+  private int currentTableIndex;
+  private int startingTableIndex;
+  private FileScanFramework.FileSchemaNegotiator negotiator;
+
+  // Document Metadata Fields
+  private int pageCount;
+  private String title;
+  private String author;
+  private String subject;
+  private String keywords;
+  private String creator;
+  private String producer;
+  private Calendar creationDate;
+  private Calendar modificationDate;
+  private String trapped;
+  private int unregisteredColumnCount;
+  private int columns;
+  private int tableCount;
+  private int rows;
+  private int metadataIndex;
+
+
+  // Tables
+  private List<Table> tables;
+
+
+  static class PdfReaderConfig {
+    final PdfFormatPlugin plugin;
+
+    PdfReaderConfig(PdfFormatPlugin plugin) {
+      this.plugin = plugin;
+    }
+  }
+
+  public PdfBatchReader(PdfReaderConfig readerConfig, int maxRecords) {
+    this.maxRecords = maxRecords;
+    this.unregisteredColumnCount = 0;
+    this.writers = new ArrayList<>();
+    this.config = readerConfig;
+    this.startingTableIndex = readerConfig.plugin.getConfig().getDefaultTableIndex() < 0 ? 0 : readerConfig.plugin.getConfig().getDefaultTableIndex();
+    this.currentTableIndex = this.startingTableIndex;
+    this.columnHeaders = new ArrayList<>();
+  }
+
+  @Override
+  public boolean open(FileScanFramework.FileSchemaNegotiator negotiator) {
+    System.setProperty("java.awt.headless", "true");
+    this.negotiator = negotiator;
+
+    split = negotiator.split();
+    errorContext = negotiator.parentErrorContext();
+    builder = new SchemaBuilder();
+
+    openFile();
+
+    // Get the tables
+    tables = Utils.extractTablesFromPDF(document);
+    populateMetadata();
+    logger.debug("Found {} tables", tables.size());
+
+    // Support provided schema
+    TupleMetadata schema = null;
+    if (this.negotiator.hasProvidedSchema()) {
+      schema = this.negotiator.providedSchema();
+      this.negotiator.tableSchema(schema, false);
+    } else {
+      this.negotiator.tableSchema(buildSchema(), false);
+    }
+    ResultSetLoader loader = this.negotiator.build();
+    rowWriter = loader.writer();
+
+    if (negotiator.hasProvidedSchema()) {
+      buildWriterListFromProvidedSchema(schema);
+    } else {
+      buildWriterList();
+    }
+    addImplicitColumnsToSchema();
+
+    // Prepare for reading
+    currentRowIndex = 1;  // Skip the first line if there are headers
+    currentTable = tables.get(startingTableIndex);
+
+    return true;
+  }
+
+  @Override
+  public boolean next() {
+    System.setProperty("java.awt.headless", "true");
+
+    while(!rowWriter.isFull()) {
+      // Check to see if the limit has been reached
+      if (rowWriter.limitReached(maxRecords)) {
+        return false;
+      } else if (currentRowIndex >= currentTable.getRows().size() &&
+                  currentTableIndex < tables.size() &&
+                  config.plugin.getConfig().getCombinePages()) {
+        currentRowIndex = 0;
+        currentTable = tables.get(currentTableIndex++);
+      } else if (currentRowIndex >= currentTable.getRows().size()) {
+        return false;
+      }
+
+      // Process the row
+      processRow(currentTable.getRows().get(currentRowIndex));
+      currentRowIndex++;
+    }
+    return true;
+  }
+
+  private void processRow(List<RectangularTextContainer> row) {
+    if (row == null || row.size() == 0) {
+      return;
+    }
+
+    String value;
+    rowWriter.start();
+    for (int i = 0; i < row.size(); i++) {
+      value = row.get(i).getText();
+
+      if (Strings.isNullOrEmpty(value)) {
+        continue;
+      }
+      writers.get(i).load(row.get(i));
+    }
+    writeMetadata();
+    rowWriter.save();
+  }
+
+  @Override
+  public void close() {
+    if (fsStream != null) {
+      AutoCloseables.closeSilently(fsStream);
+      fsStream = null;
+    }
+
+    if (document != null) {
+      AutoCloseables.closeSilently(document.getDocument());
+      AutoCloseables.closeSilently(document);
+      document = null;
+    }
+  }
+
+  /**
+   * This method opens the PDF file, and finds the tables
+   */
+  private void openFile() {
+    try {
+      fsStream = negotiator.fileSystem().openPossiblyCompressedStream(split.getPath());
+      document = PDDocument.load(fsStream);
+    } catch (Exception e) {
+      throw UserException
+        .dataReadError(e)
+        .message("Failed to open open input file: %s", split.getPath().toString())
+        .addContext(e.getMessage())
+        .addContext(errorContext)
+        .build(logger);
+    }
+  }
+
+  /**
+   * Metadata fields are calculated once when the file is opened.  This function populates
+   * the metadata fields so that these are only calculated once.
+   */
+  private void populateMetadata() {
+    PDDocumentInformation info = document.getDocumentInformation();
+    pageCount = document.getNumberOfPages();
+    title = info.getTitle();
+    author = info.getAuthor();
+    subject = info.getSubject();
+    keywords = info.getKeywords();
+    creator = info.getCreator();
+    producer = info.getProducer();
+    creationDate = info.getCreationDate();
+    modificationDate = info.getModificationDate();
+    trapped = info.getTrapped();
+    tableCount = tables.size();
+  }
+
+  private void addImplicitColumnsToSchema() {
+    metadataIndex = columns;
+    // Add to schema
+    addMetadataColumnToSchema("_page_count", MinorType.INT);
+    addMetadataColumnToSchema("_title", MinorType.VARCHAR);
+    addMetadataColumnToSchema("_author", MinorType.VARCHAR);
+    addMetadataColumnToSchema("_subject", MinorType.VARCHAR);
+    addMetadataColumnToSchema("_keywords", MinorType.VARCHAR);
+    addMetadataColumnToSchema("_creator", MinorType.VARCHAR);
+    addMetadataColumnToSchema("_producer", MinorType.VARCHAR);
+    addMetadataColumnToSchema("_creation_date", MinorType.TIMESTAMP);
+    addMetadataColumnToSchema("_modification_date", MinorType.TIMESTAMP);
+    addMetadataColumnToSchema("_trapped", MinorType.VARCHAR);
+    addMetadataColumnToSchema("_table_count", MinorType.INT);
+  }
+
+  private void addMetadataColumnToSchema(String columnName, MinorType dataType) {
+    int index = rowWriter.tupleSchema().index(columnName);
+    if (index == -1) {
+      ColumnMetadata colSchema = MetadataUtils.newScalar(columnName, dataType, DataMode.OPTIONAL);
+
+      // Exclude from wildcard queries
+      colSchema.setBooleanProperty(ColumnMetadata.EXCLUDE_FROM_WILDCARD, true);
+      metadataIndex++;
+
+      index = rowWriter.addColumn(colSchema);
+    }
+
+    writers.add(new StringPdfColumnWriter(index, columnName, rowWriter));
+  }
+
+  private void writeMetadata() {
+    int startingIndex = columnHeaders.size();
+    writers.get(startingIndex).getWriter().setInt(pageCount);
+    writeStringMetadataField(title, startingIndex+1);
+    writeStringMetadataField(author, startingIndex+2);
+    writeStringMetadataField(subject, startingIndex+3);
+    writeStringMetadataField(keywords, startingIndex+4);
+    writeStringMetadataField(creator, startingIndex+5);
+    writeStringMetadataField(producer, startingIndex+6);
+    writeTimestampMetadataField(creationDate, startingIndex+7);
+    writeTimestampMetadataField(modificationDate, startingIndex+8);
+    writeStringMetadataField(trapped, startingIndex+9);
+    writers.get(startingIndex+10).getWriter().setInt(tableCount);
+  }
+
+  private void writeStringMetadataField(String value, int index) {
+    if (value == null) {
+      return;
+    }
+    writers.get(index).getWriter().setString(value);
+  }
+
+  private void writeTimestampMetadataField(Calendar dateValue, int index) {
+    if (dateValue == null) {
+      return;
+    }
+
+    writers.get(index).getWriter().setTimestamp(Instant.ofEpochMilli(dateValue.getTimeInMillis()));
+  }
+
+  private void addUnknownColumnToSchemaAndCreateWriter (TupleWriter rowWriter, String name) {
+    int index = rowWriter.tupleSchema().index(name);
+    if (index == -1) {
+      ColumnMetadata colSchema = MetadataUtils.newScalar(name, TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL);
+      index = rowWriter.addColumn(colSchema);
+    }
+    ScalarWriter colWriter = rowWriter.scalar(index);
+
+    // Create a new column name which will be field_n
+    String newColumnName = NEW_FIELD_PREFIX + unregisteredColumnCount;
+    unregisteredColumnCount++;
+
+    // Add a new writer.  Since we want the metadata always to be at the end of the schema, we must track the metadata
+    // index and add this before the metadata, so that the column index tracks with the writer index.
+    writers.add(metadataIndex, new StringPdfColumnWriter(1, newColumnName, (RowSetLoader) rowWriter));
+
+    // Increment the metadata index
+    metadataIndex++;
+  }
+
+  private TupleMetadata buildSchema() {
+    Table table = tables.get(startingTableIndex);
+    columns = table.getColCount();
+    rows = table.getRowCount();
+
+    // Get column header names
+    columnHeaders = Utils.extractRowValues(table);
+
+    // Add columns to table
+    int index = 0;
+    for (String columnName : columnHeaders) {
+      if (Strings.isNullOrEmpty(columnName)) {
+        columnName = NEW_FIELD_PREFIX + unregisteredColumnCount;
+        columnHeaders.set(index, columnName);
+        unregisteredColumnCount++;
+      }
+      builder.addNullable(columnName, MinorType.VARCHAR);
+      index++;
+    }
+
+    return builder.buildSchema();
+  }
+
+  private void buildWriterList() {
+    for (String header : columnHeaders) {
+      writers.add(new StringPdfColumnWriter(columnHeaders.indexOf(header), header, rowWriter));
+    }
+  }
+
+  private void buildWriterListFromProvidedSchema(TupleMetadata schema) {
+    if (schema == null) {
+      buildWriterList();
+      return;
+    }
+    int counter = 0;
+    for (MaterializedField field: schema.toFieldList()) {
+      String fieldName = field.getName();
+      MinorType type = field.getType().getMinorType();
+      columnHeaders.add(fieldName);
+
+      switch (type) {
+        case VARCHAR:
+          writers.add(new StringPdfColumnWriter(counter, fieldName, rowWriter));
+          break;
+        case SMALLINT:
+        case TINYINT:
+        case INT:
+          writers.add(new IntPdfColumnWriter(counter, fieldName, rowWriter));
+          break;
+        case BIGINT:
+          writers.add(new BigIntPdfColumnWriter(counter, fieldName, rowWriter));
+          break;
+        case FLOAT4:
+        case FLOAT8:
+          writers.add(new DoublePdfColumnWriter(counter, fieldName, rowWriter));
+          break;
+        case DATE:
+          writers.add(new DatePdfColumnWriter(counter, fieldName, rowWriter, negotiator));
+          break;
+        case TIME:
+          writers.add(new TimePdfColumnWriter(counter, fieldName, rowWriter, negotiator));
+          break;
+        case TIMESTAMP:
+          writers.add(new TimestampPdfColumnWriter(counter, fieldName, rowWriter, negotiator));
+          break;
+        default:
+          throw UserException.unsupportedError()
+            .message("PDF Reader with provided schema does not support " + type.name() + " data type.")
+            .addContext(errorContext)
+            .build(logger);
+      }
+    }
+  }
+
+  public abstract static class PdfColumnWriter {
+    final String columnName;
+    final ScalarWriter writer;
+    final int columnIndex;
+
+    public PdfColumnWriter(int columnIndex, String columnName, ScalarWriter writer) {
+      this.columnIndex = columnIndex;
+      this.columnName = columnName;
+      this.writer = writer;
+    }
+
+    public abstract void load (RectangularTextContainer<?> cell);
+
+    public ScalarWriter getWriter() {
+      return writer;
+    }
+  }
+
+  public static class IntPdfColumnWriter extends PdfColumnWriter {
+    IntPdfColumnWriter (int columnIndex, String columnName, RowSetLoader rowWriter) {
+      super(columnIndex, columnName, rowWriter.scalar(columnName));
+    }
+
+    @Override
+    public void load(RectangularTextContainer<?> cell) {
+      writer.setInt(Integer.parseInt(cell.getText()));
+    }
+  }
+
+  public static class BigIntPdfColumnWriter extends PdfColumnWriter {
+    BigIntPdfColumnWriter (int columnIndex, String columnName, RowSetLoader rowWriter) {
+      super(columnIndex, columnName, rowWriter.scalar(columnName));
+    }
+
+    @Override
+    public void load(RectangularTextContainer<?> cell) {
+      writer.setLong(Long.parseLong(cell.getText()));
+    }
+  }
+
+  public static class DoublePdfColumnWriter extends PdfColumnWriter {
+    DoublePdfColumnWriter (int columnIndex, String columnName, RowSetLoader rowWriter) {
+      super(columnIndex, columnName, rowWriter.scalar(columnName));
+    }
+
+    @Override
+    public void load(RectangularTextContainer<?> cell) {
+      writer.setDouble(Double.parseDouble(cell.getText()));
+    }
+  }
+
+  public static class StringPdfColumnWriter extends PdfColumnWriter {
+    StringPdfColumnWriter (int columnIndex, String columnName, RowSetLoader rowWriter) {
+      super(columnIndex, columnName, rowWriter.scalar(columnName));
+    }
+
+    @Override
+    public void load(RectangularTextContainer<?> cell) {
+      writer.setString(cell.getText());
+    }
+  }
+
+  public static class DatePdfColumnWriter extends PdfColumnWriter {
+    private String dateFormat;
+
+    DatePdfColumnWriter (int columnIndex, String columnName, RowSetLoader rowWriter, FileScanFramework.FileSchemaNegotiator negotiator) {
+      super(columnIndex, columnName, rowWriter.scalar(columnName));
+
+      ColumnMetadata metadata = negotiator.providedSchema().metadata(columnName);
+      if (metadata != null) {
+        this.dateFormat = metadata.property("drill.format");
+      }
+    }
+
+    @Override
+    public void load(RectangularTextContainer<?> cell) {
+      LocalDate localDate;
+      if (Strings.isNullOrEmpty(this.dateFormat)) {
+       localDate = LocalDate.parse(cell.getText());

Review comment:
       Nit: missing leading space.

##########
File path: contrib/format-pdf/src/main/java/org/apache/drill/exec/store/pdf/PdfBatchReader.java
##########
@@ -0,0 +1,549 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.pdf;
+
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.shaded.guava.com.google.common.base.Strings;
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.record.metadata.MetadataUtils;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.exec.vector.accessor.TupleWriter;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.pdfbox.pdmodel.PDDocument;
+import org.apache.pdfbox.pdmodel.PDDocumentInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import technology.tabula.RectangularTextContainer;
+import technology.tabula.Table;
+
+import java.io.InputStream;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.List;
+
+public class PdfBatchReader implements ManagedReader<FileScanFramework.FileSchemaNegotiator> {
+
+  private static final Logger logger = LoggerFactory.getLogger(PdfBatchReader.class);
+  private static final String NEW_FIELD_PREFIX = "field_";
+  private final int maxRecords;
+
+  private final List<PdfColumnWriter> writers;
+  private FileSplit split;
+  private CustomErrorContext errorContext;
+  private RowSetLoader rowWriter;
+  private InputStream fsStream;
+  private PDDocument document;
+  private PdfReaderConfig config;
+
+  private SchemaBuilder builder;
+  private List<String> columnHeaders;
+  private int currentRowIndex;
+  private Table currentTable;
+  private int currentTableIndex;
+  private int startingTableIndex;
+  private FileScanFramework.FileSchemaNegotiator negotiator;
+
+  // Document Metadata Fields
+  private int pageCount;
+  private String title;
+  private String author;
+  private String subject;
+  private String keywords;
+  private String creator;
+  private String producer;
+  private Calendar creationDate;
+  private Calendar modificationDate;
+  private String trapped;
+  private int unregisteredColumnCount;
+  private int columns;
+  private int tableCount;
+  private int rows;
+  private int metadataIndex;
+
+
+  // Tables
+  private List<Table> tables;
+
+
+  static class PdfReaderConfig {
+    final PdfFormatPlugin plugin;
+
+    PdfReaderConfig(PdfFormatPlugin plugin) {
+      this.plugin = plugin;
+    }
+  }
+
+  public PdfBatchReader(PdfReaderConfig readerConfig, int maxRecords) {
+    this.maxRecords = maxRecords;
+    this.unregisteredColumnCount = 0;
+    this.writers = new ArrayList<>();
+    this.config = readerConfig;
+    this.startingTableIndex = readerConfig.plugin.getConfig().getDefaultTableIndex() < 0 ? 0 : readerConfig.plugin.getConfig().getDefaultTableIndex();
+    this.currentTableIndex = this.startingTableIndex;
+    this.columnHeaders = new ArrayList<>();
+  }
+
+  @Override
+  public boolean open(FileScanFramework.FileSchemaNegotiator negotiator) {
+    System.setProperty("java.awt.headless", "true");
+    this.negotiator = negotiator;
+
+    split = negotiator.split();
+    errorContext = negotiator.parentErrorContext();
+    builder = new SchemaBuilder();
+
+    openFile();
+
+    // Get the tables
+    tables = Utils.extractTablesFromPDF(document);
+    populateMetadata();
+    logger.debug("Found {} tables", tables.size());
+
+    // Support provided schema
+    TupleMetadata schema = null;
+    if (this.negotiator.hasProvidedSchema()) {
+      schema = this.negotiator.providedSchema();
+      this.negotiator.tableSchema(schema, false);
+    } else {
+      this.negotiator.tableSchema(buildSchema(), false);
+    }
+    ResultSetLoader loader = this.negotiator.build();
+    rowWriter = loader.writer();
+
+    if (negotiator.hasProvidedSchema()) {
+      buildWriterListFromProvidedSchema(schema);
+    } else {
+      buildWriterList();
+    }
+    addImplicitColumnsToSchema();
+
+    // Prepare for reading
+    currentRowIndex = 1;  // Skip the first line if there are headers
+    currentTable = tables.get(startingTableIndex);
+
+    return true;
+  }
+
+  @Override
+  public boolean next() {
+    System.setProperty("java.awt.headless", "true");
+
+    while(!rowWriter.isFull()) {
+      // Check to see if the limit has been reached
+      if (rowWriter.limitReached(maxRecords)) {
+        return false;
+      } else if (currentRowIndex >= currentTable.getRows().size() &&
+                  currentTableIndex < tables.size() &&
+                  config.plugin.getConfig().getCombinePages()) {
+        currentRowIndex = 0;
+        currentTable = tables.get(currentTableIndex++);
+      } else if (currentRowIndex >= currentTable.getRows().size()) {
+        return false;
+      }
+
+      // Process the row
+      processRow(currentTable.getRows().get(currentRowIndex));
+      currentRowIndex++;
+    }
+    return true;
+  }
+
+  private void processRow(List<RectangularTextContainer> row) {
+    if (row == null || row.size() == 0) {
+      return;
+    }
+
+    String value;
+    rowWriter.start();
+    for (int i = 0; i < row.size(); i++) {
+      value = row.get(i).getText();
+
+      if (Strings.isNullOrEmpty(value)) {
+        continue;
+      }
+      writers.get(i).load(row.get(i));
+    }
+    writeMetadata();
+    rowWriter.save();
+  }
+
+  @Override
+  public void close() {
+    if (fsStream != null) {
+      AutoCloseables.closeSilently(fsStream);
+      fsStream = null;
+    }
+
+    if (document != null) {
+      AutoCloseables.closeSilently(document.getDocument());
+      AutoCloseables.closeSilently(document);
+      document = null;
+    }
+  }
+
+  /**
+   * This method opens the PDF file, and finds the tables
+   */
+  private void openFile() {
+    try {
+      fsStream = negotiator.fileSystem().openPossiblyCompressedStream(split.getPath());
+      document = PDDocument.load(fsStream);
+    } catch (Exception e) {
+      throw UserException
+        .dataReadError(e)
+        .message("Failed to open open input file: %s", split.getPath().toString())
+        .addContext(e.getMessage())
+        .addContext(errorContext)
+        .build(logger);
+    }
+  }
+
+  /**
+   * Metadata fields are calculated once when the file is opened.  This function populates
+   * the metadata fields so that these are only calculated once.
+   */
+  private void populateMetadata() {
+    PDDocumentInformation info = document.getDocumentInformation();
+    pageCount = document.getNumberOfPages();
+    title = info.getTitle();
+    author = info.getAuthor();
+    subject = info.getSubject();
+    keywords = info.getKeywords();
+    creator = info.getCreator();
+    producer = info.getProducer();
+    creationDate = info.getCreationDate();
+    modificationDate = info.getModificationDate();
+    trapped = info.getTrapped();
+    tableCount = tables.size();
+  }
+
+  private void addImplicitColumnsToSchema() {
+    metadataIndex = columns;
+    // Add to schema
+    addMetadataColumnToSchema("_page_count", MinorType.INT);

Review comment:
       Do this in the proposed metadata class.

##########
File path: contrib/format-pdf/src/main/java/org/apache/drill/exec/store/pdf/PdfBatchReader.java
##########
@@ -0,0 +1,549 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.pdf;
+
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.shaded.guava.com.google.common.base.Strings;
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.record.metadata.MetadataUtils;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.exec.vector.accessor.TupleWriter;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.pdfbox.pdmodel.PDDocument;
+import org.apache.pdfbox.pdmodel.PDDocumentInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import technology.tabula.RectangularTextContainer;
+import technology.tabula.Table;
+
+import java.io.InputStream;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.List;
+
+public class PdfBatchReader implements ManagedReader<FileScanFramework.FileSchemaNegotiator> {
+
+  private static final Logger logger = LoggerFactory.getLogger(PdfBatchReader.class);
+  private static final String NEW_FIELD_PREFIX = "field_";
+  private final int maxRecords;
+
+  private final List<PdfColumnWriter> writers;
+  private FileSplit split;
+  private CustomErrorContext errorContext;
+  private RowSetLoader rowWriter;
+  private InputStream fsStream;
+  private PDDocument document;
+  private PdfReaderConfig config;
+
+  private SchemaBuilder builder;
+  private List<String> columnHeaders;
+  private int currentRowIndex;
+  private Table currentTable;
+  private int currentTableIndex;
+  private int startingTableIndex;
+  private FileScanFramework.FileSchemaNegotiator negotiator;
+
+  // Document Metadata Fields
+  private int pageCount;
+  private String title;
+  private String author;
+  private String subject;
+  private String keywords;
+  private String creator;
+  private String producer;
+  private Calendar creationDate;
+  private Calendar modificationDate;
+  private String trapped;
+  private int unregisteredColumnCount;
+  private int columns;
+  private int tableCount;
+  private int rows;
+  private int metadataIndex;
+
+
+  // Tables
+  private List<Table> tables;
+
+
+  static class PdfReaderConfig {
+    final PdfFormatPlugin plugin;
+
+    PdfReaderConfig(PdfFormatPlugin plugin) {
+      this.plugin = plugin;
+    }
+  }
+
+  public PdfBatchReader(PdfReaderConfig readerConfig, int maxRecords) {
+    this.maxRecords = maxRecords;
+    this.unregisteredColumnCount = 0;
+    this.writers = new ArrayList<>();
+    this.config = readerConfig;
+    this.startingTableIndex = readerConfig.plugin.getConfig().getDefaultTableIndex() < 0 ? 0 : readerConfig.plugin.getConfig().getDefaultTableIndex();
+    this.currentTableIndex = this.startingTableIndex;
+    this.columnHeaders = new ArrayList<>();
+  }
+
+  @Override
+  public boolean open(FileScanFramework.FileSchemaNegotiator negotiator) {
+    System.setProperty("java.awt.headless", "true");
+    this.negotiator = negotiator;
+
+    split = negotiator.split();
+    errorContext = negotiator.parentErrorContext();
+    builder = new SchemaBuilder();
+
+    openFile();
+
+    // Get the tables
+    tables = Utils.extractTablesFromPDF(document);
+    populateMetadata();
+    logger.debug("Found {} tables", tables.size());
+
+    // Support provided schema
+    TupleMetadata schema = null;
+    if (this.negotiator.hasProvidedSchema()) {
+      schema = this.negotiator.providedSchema();
+      this.negotiator.tableSchema(schema, false);
+    } else {
+      this.negotiator.tableSchema(buildSchema(), false);
+    }
+    ResultSetLoader loader = this.negotiator.build();
+    rowWriter = loader.writer();
+
+    if (negotiator.hasProvidedSchema()) {
+      buildWriterListFromProvidedSchema(schema);
+    } else {
+      buildWriterList();
+    }
+    addImplicitColumnsToSchema();
+
+    // Prepare for reading
+    currentRowIndex = 1;  // Skip the first line if there are headers
+    currentTable = tables.get(startingTableIndex);
+
+    return true;
+  }
+
+  @Override
+  public boolean next() {
+    System.setProperty("java.awt.headless", "true");
+
+    while(!rowWriter.isFull()) {
+      // Check to see if the limit has been reached
+      if (rowWriter.limitReached(maxRecords)) {
+        return false;
+      } else if (currentRowIndex >= currentTable.getRows().size() &&
+                  currentTableIndex < tables.size() &&
+                  config.plugin.getConfig().getCombinePages()) {
+        currentRowIndex = 0;
+        currentTable = tables.get(currentTableIndex++);
+      } else if (currentRowIndex >= currentTable.getRows().size()) {
+        return false;
+      }
+
+      // Process the row
+      processRow(currentTable.getRows().get(currentRowIndex));
+      currentRowIndex++;
+    }
+    return true;
+  }
+
+  private void processRow(List<RectangularTextContainer> row) {
+    if (row == null || row.size() == 0) {
+      return;
+    }
+
+    String value;
+    rowWriter.start();
+    for (int i = 0; i < row.size(); i++) {
+      value = row.get(i).getText();
+
+      if (Strings.isNullOrEmpty(value)) {
+        continue;
+      }
+      writers.get(i).load(row.get(i));
+    }
+    writeMetadata();

Review comment:
       Here's were we've got an issue: can't see metadata without there being tables and having to fetch them.

##########
File path: contrib/format-pdf/src/main/java/org/apache/drill/exec/store/pdf/PdfBatchReader.java
##########
@@ -0,0 +1,549 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.pdf;
+
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.shaded.guava.com.google.common.base.Strings;
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.record.metadata.MetadataUtils;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.exec.vector.accessor.TupleWriter;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.pdfbox.pdmodel.PDDocument;
+import org.apache.pdfbox.pdmodel.PDDocumentInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import technology.tabula.RectangularTextContainer;
+import technology.tabula.Table;
+
+import java.io.InputStream;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.List;
+
+public class PdfBatchReader implements ManagedReader<FileScanFramework.FileSchemaNegotiator> {
+
+  private static final Logger logger = LoggerFactory.getLogger(PdfBatchReader.class);
+  private static final String NEW_FIELD_PREFIX = "field_";
+  private final int maxRecords;
+
+  private final List<PdfColumnWriter> writers;
+  private FileSplit split;
+  private CustomErrorContext errorContext;
+  private RowSetLoader rowWriter;
+  private InputStream fsStream;
+  private PDDocument document;
+  private PdfReaderConfig config;
+
+  private SchemaBuilder builder;
+  private List<String> columnHeaders;
+  private int currentRowIndex;
+  private Table currentTable;
+  private int currentTableIndex;
+  private int startingTableIndex;
+  private FileScanFramework.FileSchemaNegotiator negotiator;
+
+  // Document Metadata Fields
+  private int pageCount;
+  private String title;
+  private String author;
+  private String subject;
+  private String keywords;
+  private String creator;
+  private String producer;
+  private Calendar creationDate;
+  private Calendar modificationDate;
+  private String trapped;
+  private int unregisteredColumnCount;
+  private int columns;
+  private int tableCount;
+  private int rows;
+  private int metadataIndex;
+
+
+  // Tables
+  private List<Table> tables;
+
+
+  static class PdfReaderConfig {
+    final PdfFormatPlugin plugin;
+
+    PdfReaderConfig(PdfFormatPlugin plugin) {
+      this.plugin = plugin;
+    }
+  }
+
+  public PdfBatchReader(PdfReaderConfig readerConfig, int maxRecords) {
+    this.maxRecords = maxRecords;
+    this.unregisteredColumnCount = 0;
+    this.writers = new ArrayList<>();
+    this.config = readerConfig;
+    this.startingTableIndex = readerConfig.plugin.getConfig().getDefaultTableIndex() < 0 ? 0 : readerConfig.plugin.getConfig().getDefaultTableIndex();
+    this.currentTableIndex = this.startingTableIndex;
+    this.columnHeaders = new ArrayList<>();
+  }
+
+  @Override
+  public boolean open(FileScanFramework.FileSchemaNegotiator negotiator) {
+    System.setProperty("java.awt.headless", "true");
+    this.negotiator = negotiator;
+
+    split = negotiator.split();
+    errorContext = negotiator.parentErrorContext();
+    builder = new SchemaBuilder();
+
+    openFile();
+
+    // Get the tables
+    tables = Utils.extractTablesFromPDF(document);
+    populateMetadata();
+    logger.debug("Found {} tables", tables.size());
+
+    // Support provided schema
+    TupleMetadata schema = null;
+    if (this.negotiator.hasProvidedSchema()) {
+      schema = this.negotiator.providedSchema();
+      this.negotiator.tableSchema(schema, false);
+    } else {
+      this.negotiator.tableSchema(buildSchema(), false);
+    }
+    ResultSetLoader loader = this.negotiator.build();
+    rowWriter = loader.writer();
+
+    if (negotiator.hasProvidedSchema()) {
+      buildWriterListFromProvidedSchema(schema);
+    } else {
+      buildWriterList();
+    }
+    addImplicitColumnsToSchema();
+
+    // Prepare for reading
+    currentRowIndex = 1;  // Skip the first line if there are headers
+    currentTable = tables.get(startingTableIndex);
+
+    return true;
+  }
+
+  @Override
+  public boolean next() {
+    System.setProperty("java.awt.headless", "true");
+
+    while(!rowWriter.isFull()) {
+      // Check to see if the limit has been reached
+      if (rowWriter.limitReached(maxRecords)) {
+        return false;
+      } else if (currentRowIndex >= currentTable.getRows().size() &&
+                  currentTableIndex < tables.size() &&
+                  config.plugin.getConfig().getCombinePages()) {
+        currentRowIndex = 0;
+        currentTable = tables.get(currentTableIndex++);
+      } else if (currentRowIndex >= currentTable.getRows().size()) {
+        return false;
+      }
+
+      // Process the row
+      processRow(currentTable.getRows().get(currentRowIndex));
+      currentRowIndex++;
+    }
+    return true;
+  }
+
+  private void processRow(List<RectangularTextContainer> row) {
+    if (row == null || row.size() == 0) {
+      return;
+    }
+
+    String value;
+    rowWriter.start();
+    for (int i = 0; i < row.size(); i++) {
+      value = row.get(i).getText();
+
+      if (Strings.isNullOrEmpty(value)) {
+        continue;
+      }
+      writers.get(i).load(row.get(i));
+    }
+    writeMetadata();
+    rowWriter.save();
+  }
+
+  @Override
+  public void close() {
+    if (fsStream != null) {
+      AutoCloseables.closeSilently(fsStream);
+      fsStream = null;
+    }
+
+    if (document != null) {
+      AutoCloseables.closeSilently(document.getDocument());
+      AutoCloseables.closeSilently(document);
+      document = null;
+    }
+  }
+
+  /**
+   * This method opens the PDF file, and finds the tables
+   */
+  private void openFile() {
+    try {
+      fsStream = negotiator.fileSystem().openPossiblyCompressedStream(split.getPath());
+      document = PDDocument.load(fsStream);
+    } catch (Exception e) {
+      throw UserException
+        .dataReadError(e)
+        .message("Failed to open open input file: %s", split.getPath().toString())
+        .addContext(e.getMessage())
+        .addContext(errorContext)
+        .build(logger);
+    }
+  }
+
+  /**
+   * Metadata fields are calculated once when the file is opened.  This function populates
+   * the metadata fields so that these are only calculated once.
+   */
+  private void populateMetadata() {
+    PDDocumentInformation info = document.getDocumentInformation();
+    pageCount = document.getNumberOfPages();
+    title = info.getTitle();
+    author = info.getAuthor();
+    subject = info.getSubject();
+    keywords = info.getKeywords();
+    creator = info.getCreator();
+    producer = info.getProducer();
+    creationDate = info.getCreationDate();
+    modificationDate = info.getModificationDate();
+    trapped = info.getTrapped();
+    tableCount = tables.size();
+  }
+
+  private void addImplicitColumnsToSchema() {
+    metadataIndex = columns;
+    // Add to schema
+    addMetadataColumnToSchema("_page_count", MinorType.INT);
+    addMetadataColumnToSchema("_title", MinorType.VARCHAR);
+    addMetadataColumnToSchema("_author", MinorType.VARCHAR);
+    addMetadataColumnToSchema("_subject", MinorType.VARCHAR);
+    addMetadataColumnToSchema("_keywords", MinorType.VARCHAR);
+    addMetadataColumnToSchema("_creator", MinorType.VARCHAR);
+    addMetadataColumnToSchema("_producer", MinorType.VARCHAR);
+    addMetadataColumnToSchema("_creation_date", MinorType.TIMESTAMP);
+    addMetadataColumnToSchema("_modification_date", MinorType.TIMESTAMP);
+    addMetadataColumnToSchema("_trapped", MinorType.VARCHAR);
+    addMetadataColumnToSchema("_table_count", MinorType.INT);
+  }
+
+  private void addMetadataColumnToSchema(String columnName, MinorType dataType) {
+    int index = rowWriter.tupleSchema().index(columnName);
+    if (index == -1) {
+      ColumnMetadata colSchema = MetadataUtils.newScalar(columnName, dataType, DataMode.OPTIONAL);
+
+      // Exclude from wildcard queries
+      colSchema.setBooleanProperty(ColumnMetadata.EXCLUDE_FROM_WILDCARD, true);
+      metadataIndex++;
+
+      index = rowWriter.addColumn(colSchema);
+    }
+
+    writers.add(new StringPdfColumnWriter(index, columnName, rowWriter));
+  }
+
+  private void writeMetadata() {
+    int startingIndex = columnHeaders.size();
+    writers.get(startingIndex).getWriter().setInt(pageCount);
+    writeStringMetadataField(title, startingIndex+1);
+    writeStringMetadataField(author, startingIndex+2);
+    writeStringMetadataField(subject, startingIndex+3);
+    writeStringMetadataField(keywords, startingIndex+4);
+    writeStringMetadataField(creator, startingIndex+5);
+    writeStringMetadataField(producer, startingIndex+6);
+    writeTimestampMetadataField(creationDate, startingIndex+7);
+    writeTimestampMetadataField(modificationDate, startingIndex+8);
+    writeStringMetadataField(trapped, startingIndex+9);
+    writers.get(startingIndex+10).getWriter().setInt(tableCount);
+  }
+
+  private void writeStringMetadataField(String value, int index) {
+    if (value == null) {
+      return;
+    }
+    writers.get(index).getWriter().setString(value);
+  }
+
+  private void writeTimestampMetadataField(Calendar dateValue, int index) {
+    if (dateValue == null) {
+      return;
+    }
+
+    writers.get(index).getWriter().setTimestamp(Instant.ofEpochMilli(dateValue.getTimeInMillis()));
+  }
+
+  private void addUnknownColumnToSchemaAndCreateWriter (TupleWriter rowWriter, String name) {
+    int index = rowWriter.tupleSchema().index(name);
+    if (index == -1) {
+      ColumnMetadata colSchema = MetadataUtils.newScalar(name, TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL);
+      index = rowWriter.addColumn(colSchema);
+    }
+    ScalarWriter colWriter = rowWriter.scalar(index);
+
+    // Create a new column name which will be field_n
+    String newColumnName = NEW_FIELD_PREFIX + unregisteredColumnCount;
+    unregisteredColumnCount++;
+
+    // Add a new writer.  Since we want the metadata always to be at the end of the schema, we must track the metadata
+    // index and add this before the metadata, so that the column index tracks with the writer index.
+    writers.add(metadataIndex, new StringPdfColumnWriter(1, newColumnName, (RowSetLoader) rowWriter));
+
+    // Increment the metadata index
+    metadataIndex++;
+  }
+
+  private TupleMetadata buildSchema() {
+    Table table = tables.get(startingTableIndex);
+    columns = table.getColCount();
+    rows = table.getRowCount();
+
+    // Get column header names
+    columnHeaders = Utils.extractRowValues(table);
+
+    // Add columns to table
+    int index = 0;
+    for (String columnName : columnHeaders) {
+      if (Strings.isNullOrEmpty(columnName)) {
+        columnName = NEW_FIELD_PREFIX + unregisteredColumnCount;
+        columnHeaders.set(index, columnName);
+        unregisteredColumnCount++;
+      }
+      builder.addNullable(columnName, MinorType.VARCHAR);
+      index++;
+    }
+
+    return builder.buildSchema();
+  }
+
+  private void buildWriterList() {

Review comment:
       We build the writers two ways depending on provided schema or not. With V2, the idea is to build the schema once, give it to the `SchemaNegotiator`, and get back a writer with all the needed columns. Adding columns to a writer is a bit old-school, needed only for formats, such as JSON, where we really don't know the schema until we read a record.

##########
File path: contrib/format-pdf/src/main/java/org/apache/drill/exec/store/pdf/PdfBatchReader.java
##########
@@ -0,0 +1,549 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.pdf;
+
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.shaded.guava.com.google.common.base.Strings;
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.record.metadata.MetadataUtils;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.exec.vector.accessor.TupleWriter;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.pdfbox.pdmodel.PDDocument;
+import org.apache.pdfbox.pdmodel.PDDocumentInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import technology.tabula.RectangularTextContainer;
+import technology.tabula.Table;
+
+import java.io.InputStream;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.List;
+
+public class PdfBatchReader implements ManagedReader<FileScanFramework.FileSchemaNegotiator> {
+
+  private static final Logger logger = LoggerFactory.getLogger(PdfBatchReader.class);
+  private static final String NEW_FIELD_PREFIX = "field_";
+  private final int maxRecords;
+
+  private final List<PdfColumnWriter> writers;
+  private FileSplit split;
+  private CustomErrorContext errorContext;
+  private RowSetLoader rowWriter;
+  private InputStream fsStream;
+  private PDDocument document;
+  private PdfReaderConfig config;
+
+  private SchemaBuilder builder;
+  private List<String> columnHeaders;
+  private int currentRowIndex;
+  private Table currentTable;
+  private int currentTableIndex;
+  private int startingTableIndex;
+  private FileScanFramework.FileSchemaNegotiator negotiator;
+
+  // Document Metadata Fields
+  private int pageCount;
+  private String title;
+  private String author;
+  private String subject;
+  private String keywords;
+  private String creator;
+  private String producer;
+  private Calendar creationDate;
+  private Calendar modificationDate;
+  private String trapped;
+  private int unregisteredColumnCount;
+  private int columns;
+  private int tableCount;
+  private int rows;
+  private int metadataIndex;
+
+
+  // Tables
+  private List<Table> tables;
+
+
+  static class PdfReaderConfig {
+    final PdfFormatPlugin plugin;
+
+    PdfReaderConfig(PdfFormatPlugin plugin) {
+      this.plugin = plugin;
+    }
+  }
+
+  public PdfBatchReader(PdfReaderConfig readerConfig, int maxRecords) {
+    this.maxRecords = maxRecords;
+    this.unregisteredColumnCount = 0;
+    this.writers = new ArrayList<>();
+    this.config = readerConfig;
+    this.startingTableIndex = readerConfig.plugin.getConfig().getDefaultTableIndex() < 0 ? 0 : readerConfig.plugin.getConfig().getDefaultTableIndex();
+    this.currentTableIndex = this.startingTableIndex;
+    this.columnHeaders = new ArrayList<>();
+  }
+
+  @Override
+  public boolean open(FileScanFramework.FileSchemaNegotiator negotiator) {
+    System.setProperty("java.awt.headless", "true");
+    this.negotiator = negotiator;
+
+    split = negotiator.split();
+    errorContext = negotiator.parentErrorContext();
+    builder = new SchemaBuilder();
+
+    openFile();
+
+    // Get the tables
+    tables = Utils.extractTablesFromPDF(document);
+    populateMetadata();
+    logger.debug("Found {} tables", tables.size());
+
+    // Support provided schema
+    TupleMetadata schema = null;
+    if (this.negotiator.hasProvidedSchema()) {
+      schema = this.negotiator.providedSchema();
+      this.negotiator.tableSchema(schema, false);
+    } else {
+      this.negotiator.tableSchema(buildSchema(), false);
+    }
+    ResultSetLoader loader = this.negotiator.build();
+    rowWriter = loader.writer();
+
+    if (negotiator.hasProvidedSchema()) {
+      buildWriterListFromProvidedSchema(schema);
+    } else {
+      buildWriterList();
+    }
+    addImplicitColumnsToSchema();
+
+    // Prepare for reading
+    currentRowIndex = 1;  // Skip the first line if there are headers
+    currentTable = tables.get(startingTableIndex);
+
+    return true;
+  }
+
+  @Override
+  public boolean next() {
+    System.setProperty("java.awt.headless", "true");
+
+    while(!rowWriter.isFull()) {
+      // Check to see if the limit has been reached
+      if (rowWriter.limitReached(maxRecords)) {
+        return false;
+      } else if (currentRowIndex >= currentTable.getRows().size() &&
+                  currentTableIndex < tables.size() &&
+                  config.plugin.getConfig().getCombinePages()) {
+        currentRowIndex = 0;
+        currentTable = tables.get(currentTableIndex++);
+      } else if (currentRowIndex >= currentTable.getRows().size()) {
+        return false;
+      }
+
+      // Process the row
+      processRow(currentTable.getRows().get(currentRowIndex));
+      currentRowIndex++;
+    }
+    return true;
+  }
+
+  private void processRow(List<RectangularTextContainer> row) {
+    if (row == null || row.size() == 0) {
+      return;
+    }
+
+    String value;
+    rowWriter.start();
+    for (int i = 0; i < row.size(); i++) {
+      value = row.get(i).getText();
+
+      if (Strings.isNullOrEmpty(value)) {
+        continue;
+      }
+      writers.get(i).load(row.get(i));
+    }
+    writeMetadata();
+    rowWriter.save();
+  }
+
+  @Override
+  public void close() {
+    if (fsStream != null) {
+      AutoCloseables.closeSilently(fsStream);
+      fsStream = null;
+    }
+
+    if (document != null) {
+      AutoCloseables.closeSilently(document.getDocument());
+      AutoCloseables.closeSilently(document);
+      document = null;
+    }
+  }
+
+  /**
+   * This method opens the PDF file, and finds the tables
+   */
+  private void openFile() {
+    try {
+      fsStream = negotiator.fileSystem().openPossiblyCompressedStream(split.getPath());
+      document = PDDocument.load(fsStream);
+    } catch (Exception e) {
+      throw UserException
+        .dataReadError(e)
+        .message("Failed to open open input file: %s", split.getPath().toString())
+        .addContext(e.getMessage())
+        .addContext(errorContext)
+        .build(logger);
+    }
+  }
+
+  /**
+   * Metadata fields are calculated once when the file is opened.  This function populates
+   * the metadata fields so that these are only calculated once.
+   */
+  private void populateMetadata() {
+    PDDocumentInformation info = document.getDocumentInformation();
+    pageCount = document.getNumberOfPages();
+    title = info.getTitle();
+    author = info.getAuthor();
+    subject = info.getSubject();
+    keywords = info.getKeywords();
+    creator = info.getCreator();
+    producer = info.getProducer();
+    creationDate = info.getCreationDate();
+    modificationDate = info.getModificationDate();
+    trapped = info.getTrapped();
+    tableCount = tables.size();
+  }
+
+  private void addImplicitColumnsToSchema() {
+    metadataIndex = columns;
+    // Add to schema
+    addMetadataColumnToSchema("_page_count", MinorType.INT);
+    addMetadataColumnToSchema("_title", MinorType.VARCHAR);
+    addMetadataColumnToSchema("_author", MinorType.VARCHAR);
+    addMetadataColumnToSchema("_subject", MinorType.VARCHAR);
+    addMetadataColumnToSchema("_keywords", MinorType.VARCHAR);
+    addMetadataColumnToSchema("_creator", MinorType.VARCHAR);
+    addMetadataColumnToSchema("_producer", MinorType.VARCHAR);
+    addMetadataColumnToSchema("_creation_date", MinorType.TIMESTAMP);
+    addMetadataColumnToSchema("_modification_date", MinorType.TIMESTAMP);
+    addMetadataColumnToSchema("_trapped", MinorType.VARCHAR);
+    addMetadataColumnToSchema("_table_count", MinorType.INT);
+  }
+
+  private void addMetadataColumnToSchema(String columnName, MinorType dataType) {
+    int index = rowWriter.tupleSchema().index(columnName);
+    if (index == -1) {
+      ColumnMetadata colSchema = MetadataUtils.newScalar(columnName, dataType, DataMode.OPTIONAL);
+
+      // Exclude from wildcard queries
+      colSchema.setBooleanProperty(ColumnMetadata.EXCLUDE_FROM_WILDCARD, true);
+      metadataIndex++;
+
+      index = rowWriter.addColumn(colSchema);
+    }
+
+    writers.add(new StringPdfColumnWriter(index, columnName, rowWriter));
+  }
+
+  private void writeMetadata() {
+    int startingIndex = columnHeaders.size();
+    writers.get(startingIndex).getWriter().setInt(pageCount);
+    writeStringMetadataField(title, startingIndex+1);
+    writeStringMetadataField(author, startingIndex+2);
+    writeStringMetadataField(subject, startingIndex+3);
+    writeStringMetadataField(keywords, startingIndex+4);
+    writeStringMetadataField(creator, startingIndex+5);
+    writeStringMetadataField(producer, startingIndex+6);
+    writeTimestampMetadataField(creationDate, startingIndex+7);
+    writeTimestampMetadataField(modificationDate, startingIndex+8);
+    writeStringMetadataField(trapped, startingIndex+9);
+    writers.get(startingIndex+10).getWriter().setInt(tableCount);
+  }
+
+  private void writeStringMetadataField(String value, int index) {
+    if (value == null) {
+      return;
+    }
+    writers.get(index).getWriter().setString(value);
+  }
+
+  private void writeTimestampMetadataField(Calendar dateValue, int index) {
+    if (dateValue == null) {
+      return;
+    }
+
+    writers.get(index).getWriter().setTimestamp(Instant.ofEpochMilli(dateValue.getTimeInMillis()));
+  }
+
+  private void addUnknownColumnToSchemaAndCreateWriter (TupleWriter rowWriter, String name) {
+    int index = rowWriter.tupleSchema().index(name);
+    if (index == -1) {
+      ColumnMetadata colSchema = MetadataUtils.newScalar(name, TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL);
+      index = rowWriter.addColumn(colSchema);
+    }
+    ScalarWriter colWriter = rowWriter.scalar(index);
+
+    // Create a new column name which will be field_n
+    String newColumnName = NEW_FIELD_PREFIX + unregisteredColumnCount;
+    unregisteredColumnCount++;
+
+    // Add a new writer.  Since we want the metadata always to be at the end of the schema, we must track the metadata
+    // index and add this before the metadata, so that the column index tracks with the writer index.
+    writers.add(metadataIndex, new StringPdfColumnWriter(1, newColumnName, (RowSetLoader) rowWriter));
+
+    // Increment the metadata index
+    metadataIndex++;
+  }
+
+  private TupleMetadata buildSchema() {
+    Table table = tables.get(startingTableIndex);
+    columns = table.getColCount();
+    rows = table.getRowCount();
+
+    // Get column header names
+    columnHeaders = Utils.extractRowValues(table);
+
+    // Add columns to table
+    int index = 0;
+    for (String columnName : columnHeaders) {
+      if (Strings.isNullOrEmpty(columnName)) {
+        columnName = NEW_FIELD_PREFIX + unregisteredColumnCount;
+        columnHeaders.set(index, columnName);
+        unregisteredColumnCount++;
+      }
+      builder.addNullable(columnName, MinorType.VARCHAR);
+      index++;
+    }
+
+    return builder.buildSchema();
+  }
+
+  private void buildWriterList() {
+    for (String header : columnHeaders) {
+      writers.add(new StringPdfColumnWriter(columnHeaders.indexOf(header), header, rowWriter));
+    }
+  }
+
+  private void buildWriterListFromProvidedSchema(TupleMetadata schema) {
+    if (schema == null) {
+      buildWriterList();
+      return;
+    }
+    int counter = 0;
+    for (MaterializedField field: schema.toFieldList()) {
+      String fieldName = field.getName();
+      MinorType type = field.getType().getMinorType();
+      columnHeaders.add(fieldName);
+
+      switch (type) {
+        case VARCHAR:
+          writers.add(new StringPdfColumnWriter(counter, fieldName, rowWriter));

Review comment:
       V2 does this for you: builds writers based on the declared field type.

##########
File path: contrib/format-pdf/src/main/java/org/apache/drill/exec/store/pdf/Utils.java
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.pdf;
+
+import org.apache.pdfbox.pdmodel.PDDocument;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import technology.tabula.ObjectExtractor;
+import technology.tabula.Page;
+import technology.tabula.PageIterator;
+import technology.tabula.Rectangle;
+import technology.tabula.RectangularTextContainer;
+import technology.tabula.Table;
+import technology.tabula.detectors.NurminenDetectionAlgorithm;
+import technology.tabula.extractors.BasicExtractionAlgorithm;
+import technology.tabula.extractors.ExtractionAlgorithm;
+import technology.tabula.extractors.SpreadsheetExtractionAlgorithm;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class Utils {

Review comment:
       As far as I can tell, the only utility here is the extractor. Should it be `TableExtractor` instead?

##########
File path: contrib/format-pdf/src/main/java/org/apache/drill/exec/store/pdf/Utils.java
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.pdf;
+
+import org.apache.pdfbox.pdmodel.PDDocument;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import technology.tabula.ObjectExtractor;
+import technology.tabula.Page;
+import technology.tabula.PageIterator;
+import technology.tabula.Rectangle;
+import technology.tabula.RectangularTextContainer;
+import technology.tabula.Table;
+import technology.tabula.detectors.NurminenDetectionAlgorithm;
+import technology.tabula.extractors.BasicExtractionAlgorithm;
+import technology.tabula.extractors.ExtractionAlgorithm;
+import technology.tabula.extractors.SpreadsheetExtractionAlgorithm;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class Utils {
+
+  public static final ExtractionAlgorithm DEFAULT_ALGORITHM = new BasicExtractionAlgorithm();
+  private static final Logger logger = LoggerFactory.getLogger(Utils.class);
+
+  /**
+   * Returns a list of tables found in a given PDF document.  There are several extraction algorithms
+   * available and this function uses the default Basic Extraction Algorithm.
+   * @param document The input PDF document to search for tables
+   * @return A list of tables found in the document.
+   */
+  public static List<Table> extractTablesFromPDF(PDDocument document) {
+    return extractTablesFromPDF(document, DEFAULT_ALGORITHM);
+  }
+
+  public static List<Table> extractTablesFromPDF(PDDocument document, ExtractionAlgorithm algorithm) {
+    System.setProperty("java.awt.headless", "true");
+
+    NurminenDetectionAlgorithm detectionAlgorithm = new NurminenDetectionAlgorithm();
+
+    ExtractionAlgorithm algExtractor;
+
+    SpreadsheetExtractionAlgorithm extractor = new SpreadsheetExtractionAlgorithm();
+
+    ObjectExtractor objectExtractor = new ObjectExtractor(document);
+    PageIterator pages = objectExtractor.extract();
+    List<Table> tables= new ArrayList<>();
+    while (pages.hasNext()) {
+      Page page = pages.next();
+
+      algExtractor = algorithm;
+      List<Rectangle> tablesOnPage = detectionAlgorithm.detect(page);
+
+      for (Rectangle guessRect : tablesOnPage) {
+        Page guess = page.getArea(guessRect);
+        tables.addAll(algExtractor.extract(guess));
+      }
+    }
+
+    try {
+      objectExtractor.close();
+    } catch (Exception e) {
+      logger.debug("Error closing Object extractor.");

Review comment:
       Should we be doing something useful with any exceptions, such as telling the user that the extract failed?
   
   This line should say something about which file failed. Else, in a Drill that processed 100 PDF files, how will we know where to look for problems?

##########
File path: contrib/format-pdf/src/main/java/org/apache/drill/exec/store/pdf/Utils.java
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.pdf;
+
+import org.apache.pdfbox.pdmodel.PDDocument;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import technology.tabula.ObjectExtractor;
+import technology.tabula.Page;
+import technology.tabula.PageIterator;
+import technology.tabula.Rectangle;
+import technology.tabula.RectangularTextContainer;
+import technology.tabula.Table;
+import technology.tabula.detectors.NurminenDetectionAlgorithm;
+import technology.tabula.extractors.BasicExtractionAlgorithm;
+import technology.tabula.extractors.ExtractionAlgorithm;
+import technology.tabula.extractors.SpreadsheetExtractionAlgorithm;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class Utils {
+
+  public static final ExtractionAlgorithm DEFAULT_ALGORITHM = new BasicExtractionAlgorithm();
+  private static final Logger logger = LoggerFactory.getLogger(Utils.class);
+
+  /**
+   * Returns a list of tables found in a given PDF document.  There are several extraction algorithms
+   * available and this function uses the default Basic Extraction Algorithm.
+   * @param document The input PDF document to search for tables
+   * @return A list of tables found in the document.
+   */
+  public static List<Table> extractTablesFromPDF(PDDocument document) {
+    return extractTablesFromPDF(document, DEFAULT_ALGORITHM);
+  }
+
+  public static List<Table> extractTablesFromPDF(PDDocument document, ExtractionAlgorithm algorithm) {
+    System.setProperty("java.awt.headless", "true");

Review comment:
       Again, we probably shouldn't be doing this.

##########
File path: contrib/format-pdf/src/main/java/org/apache/drill/exec/store/pdf/Utils.java
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.pdf;
+
+import org.apache.pdfbox.pdmodel.PDDocument;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import technology.tabula.ObjectExtractor;
+import technology.tabula.Page;
+import technology.tabula.PageIterator;
+import technology.tabula.Rectangle;
+import technology.tabula.RectangularTextContainer;
+import technology.tabula.Table;
+import technology.tabula.detectors.NurminenDetectionAlgorithm;
+import technology.tabula.extractors.BasicExtractionAlgorithm;
+import technology.tabula.extractors.ExtractionAlgorithm;
+import technology.tabula.extractors.SpreadsheetExtractionAlgorithm;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class Utils {
+
+  public static final ExtractionAlgorithm DEFAULT_ALGORITHM = new BasicExtractionAlgorithm();
+  private static final Logger logger = LoggerFactory.getLogger(Utils.class);
+
+  /**
+   * Returns a list of tables found in a given PDF document.  There are several extraction algorithms
+   * available and this function uses the default Basic Extraction Algorithm.

Review comment:
       Maybe reference where this algorithm came from? What are the other algorithms?

##########
File path: contrib/format-pdf/src/main/java/org/apache/drill/exec/store/pdf/Utils.java
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.pdf;
+
+import org.apache.pdfbox.pdmodel.PDDocument;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import technology.tabula.ObjectExtractor;
+import technology.tabula.Page;
+import technology.tabula.PageIterator;
+import technology.tabula.Rectangle;
+import technology.tabula.RectangularTextContainer;
+import technology.tabula.Table;
+import technology.tabula.detectors.NurminenDetectionAlgorithm;
+import technology.tabula.extractors.BasicExtractionAlgorithm;
+import technology.tabula.extractors.ExtractionAlgorithm;
+import technology.tabula.extractors.SpreadsheetExtractionAlgorithm;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class Utils {
+
+  public static final ExtractionAlgorithm DEFAULT_ALGORITHM = new BasicExtractionAlgorithm();
+  private static final Logger logger = LoggerFactory.getLogger(Utils.class);
+
+  /**
+   * Returns a list of tables found in a given PDF document.  There are several extraction algorithms
+   * available and this function uses the default Basic Extraction Algorithm.
+   * @param document The input PDF document to search for tables
+   * @return A list of tables found in the document.
+   */
+  public static List<Table> extractTablesFromPDF(PDDocument document) {
+    return extractTablesFromPDF(document, DEFAULT_ALGORITHM);
+  }
+
+  public static List<Table> extractTablesFromPDF(PDDocument document, ExtractionAlgorithm algorithm) {
+    System.setProperty("java.awt.headless", "true");
+
+    NurminenDetectionAlgorithm detectionAlgorithm = new NurminenDetectionAlgorithm();
+
+    ExtractionAlgorithm algExtractor;
+
+    SpreadsheetExtractionAlgorithm extractor = new SpreadsheetExtractionAlgorithm();
+
+    ObjectExtractor objectExtractor = new ObjectExtractor(document);
+    PageIterator pages = objectExtractor.extract();
+    List<Table> tables= new ArrayList<>();
+    while (pages.hasNext()) {
+      Page page = pages.next();
+
+      algExtractor = algorithm;
+      List<Rectangle> tablesOnPage = detectionAlgorithm.detect(page);
+
+      for (Rectangle guessRect : tablesOnPage) {
+        Page guess = page.getArea(guessRect);
+        tables.addAll(algExtractor.extract(guess));
+      }
+    }
+
+    try {
+      objectExtractor.close();

Review comment:
       How critical is this `close()`? Should it be in a `finally` block in case any of the above lines throws an exception?
   

##########
File path: contrib/format-pdf/src/main/java/org/apache/drill/exec/store/pdf/Utils.java
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.pdf;
+
+import org.apache.pdfbox.pdmodel.PDDocument;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import technology.tabula.ObjectExtractor;
+import technology.tabula.Page;
+import technology.tabula.PageIterator;
+import technology.tabula.Rectangle;
+import technology.tabula.RectangularTextContainer;
+import technology.tabula.Table;
+import technology.tabula.detectors.NurminenDetectionAlgorithm;
+import technology.tabula.extractors.BasicExtractionAlgorithm;
+import technology.tabula.extractors.ExtractionAlgorithm;
+import technology.tabula.extractors.SpreadsheetExtractionAlgorithm;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class Utils {
+
+  public static final ExtractionAlgorithm DEFAULT_ALGORITHM = new BasicExtractionAlgorithm();
+  private static final Logger logger = LoggerFactory.getLogger(Utils.class);
+
+  /**
+   * Returns a list of tables found in a given PDF document.  There are several extraction algorithms
+   * available and this function uses the default Basic Extraction Algorithm.
+   * @param document The input PDF document to search for tables
+   * @return A list of tables found in the document.
+   */
+  public static List<Table> extractTablesFromPDF(PDDocument document) {
+    return extractTablesFromPDF(document, DEFAULT_ALGORITHM);
+  }
+
+  public static List<Table> extractTablesFromPDF(PDDocument document, ExtractionAlgorithm algorithm) {
+    System.setProperty("java.awt.headless", "true");
+
+    NurminenDetectionAlgorithm detectionAlgorithm = new NurminenDetectionAlgorithm();
+
+    ExtractionAlgorithm algExtractor;
+
+    SpreadsheetExtractionAlgorithm extractor = new SpreadsheetExtractionAlgorithm();
+
+    ObjectExtractor objectExtractor = new ObjectExtractor(document);
+    PageIterator pages = objectExtractor.extract();
+    List<Table> tables= new ArrayList<>();
+    while (pages.hasNext()) {
+      Page page = pages.next();
+
+      algExtractor = algorithm;
+      List<Rectangle> tablesOnPage = detectionAlgorithm.detect(page);
+
+      for (Rectangle guessRect : tablesOnPage) {
+        Page guess = page.getArea(guessRect);
+        tables.addAll(algExtractor.extract(guess));
+      }
+    }
+
+    try {
+      objectExtractor.close();
+    } catch (Exception e) {
+      logger.debug("Error closing Object extractor.");
+    }
+
+    return tables;
+  }
+
+  /**
+   * Returns the values contained in a PDF Table row
+   * @param table The source table
+   * @return A list of the header rows
+   */
+  public static List<String> extractRowValues(Table table) {
+    List<RectangularTextContainer> firstRow = table.getRows().get(0);
+    List<String> values = new ArrayList<>();
+
+    if (firstRow != null) {
+      for (int i =0; i < firstRow.size(); i++) {

Review comment:
       Nit: space after `=`.
   
   What happens if the rows have different sizes? Is that possible if we guessed wrong about the table? If the table merges cells? How should we handle such issues? Do we have tests for them?

##########
File path: contrib/format-pdf/src/test/resources/logback-test.xml
##########
@@ -0,0 +1,57 @@
+<?xml version="1.0" encoding="UTF-8" ?>

Review comment:
       Not clear if we want to do this. IIRC, Logback gets very confused if there are multiple config files on the class path: it prints a warning an arbitrarily picks one.
   
   Why do we need a separate config file for this module?

##########
File path: contrib/format-pdf/src/main/resources/bootstrap-format-plugins.json
##########
@@ -0,0 +1,43 @@
+{
+  "storage":{
+    "dfs": {
+      "type": "file",
+      "formats": {
+        "pdf": {
+          "type": "pdf",
+          "extensions": [
+            "pdf"
+          ],
+          "extractHeaders": true,
+          "combinePages": false
+        }
+      }
+    },
+    "cp": {

Review comment:
       Will we provide any PDFs built into Drill?

##########
File path: contrib/format-pdf/src/main/java/org/apache/drill/exec/store/pdf/PdfFormatConfig.java
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.pdf;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonInclude.Include;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+import org.apache.drill.common.PlanStringBuilder;
+import org.apache.drill.common.logical.FormatPluginConfig;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+@JsonTypeName(PdfFormatPlugin.DEFAULT_NAME)
+public class PdfFormatConfig implements FormatPluginConfig {
+
+  private final List<String> extensions;
+  private final boolean extractHeaders;
+  private final String extractionAlgorithm;
+  private final boolean combinePages;
+  private final int defaultTableIndex;
+
+  @JsonCreator
+  public PdfFormatConfig(@JsonProperty("extensions") List<String> extensions,
+                         @JsonProperty("extractHeaders") boolean extractHeaders,
+                         @JsonProperty("extractionAlgorithm") String extractionAlgorithm,
+                         @JsonProperty("combinePages") boolean combinePages,
+                         @JsonProperty("defaultTableIndex") int defaultTableIndex) {
+    this.extensions = extensions == null
+      ? Collections.singletonList("pdf")

Review comment:
       `extractionAlgorithm` is not described in the README.

##########
File path: contrib/format-pdf/src/test/resources/logback-test.xml
##########
@@ -0,0 +1,57 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+
+-->
+<configuration>
+  <if condition='property("drill.lilith.enable").equalsIgnoreCase("true")'>
+    <then>
+      <appender name="SOCKET" class="de.huxhorn.lilith.logback.appender.ClassicMultiplexSocketAppender">
+        <Compressing>true</Compressing>
+        <ReconnectionDelay>10000</ReconnectionDelay>
+        <IncludeCallerData>true</IncludeCallerData>
+        <RemoteHosts>${LILITH_HOSTNAME:-localhost}</RemoteHosts>
+      </appender>
+      <logger name="org.apache.drill" additivity="false">
+        <level value="DEBUG"/>
+        <appender-ref ref="SOCKET"/>
+      </logger>
+      <logger name="query.logger" additivity="false">
+        <level value="ERROR"/>
+        <appender-ref ref="SOCKET"/>
+      </logger>
+      <logger name="org.apache.drill.exec.store.pdf">
+        <level value="DEBUG"/>
+        <appender-ref ref="SOCKET"/>
+      </logger>
+    </then>
+  </if>
+
+  <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+    <!-- encoders are assigned the type
+         ch.qos.logback.classic.encoder.PatternLayoutEncoder by default -->
+    <encoder>
+      <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
+    </encoder>
+  </appender>
+
+  <logger name="org.apache.drill.exec.store.pdf" additivity="false">
+    <level value="DEBUG" />
+    <appender-ref ref="STDOUT" />
+  </logger>
+</configuration>

Review comment:
       Missing newline.

##########
File path: contrib/format-pdf/src/main/java/org/apache/drill/exec/store/pdf/PdfBatchReader.java
##########
@@ -0,0 +1,549 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.pdf;
+
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.shaded.guava.com.google.common.base.Strings;
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.record.metadata.MetadataUtils;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.exec.vector.accessor.TupleWriter;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.pdfbox.pdmodel.PDDocument;
+import org.apache.pdfbox.pdmodel.PDDocumentInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import technology.tabula.RectangularTextContainer;
+import technology.tabula.Table;
+
+import java.io.InputStream;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.List;
+
+public class PdfBatchReader implements ManagedReader<FileScanFramework.FileSchemaNegotiator> {
+
+  private static final Logger logger = LoggerFactory.getLogger(PdfBatchReader.class);
+  private static final String NEW_FIELD_PREFIX = "field_";
+  private final int maxRecords;
+
+  private final List<PdfColumnWriter> writers;
+  private FileSplit split;
+  private CustomErrorContext errorContext;
+  private RowSetLoader rowWriter;
+  private InputStream fsStream;
+  private PDDocument document;
+  private PdfReaderConfig config;
+
+  private SchemaBuilder builder;
+  private List<String> columnHeaders;
+  private int currentRowIndex;
+  private Table currentTable;
+  private int currentTableIndex;
+  private int startingTableIndex;
+  private FileScanFramework.FileSchemaNegotiator negotiator;
+
+  // Document Metadata Fields
+  private int pageCount;
+  private String title;
+  private String author;
+  private String subject;
+  private String keywords;
+  private String creator;
+  private String producer;
+  private Calendar creationDate;
+  private Calendar modificationDate;
+  private String trapped;
+  private int unregisteredColumnCount;
+  private int columns;
+  private int tableCount;
+  private int rows;
+  private int metadataIndex;
+
+
+  // Tables
+  private List<Table> tables;
+
+
+  static class PdfReaderConfig {
+    final PdfFormatPlugin plugin;
+
+    PdfReaderConfig(PdfFormatPlugin plugin) {
+      this.plugin = plugin;
+    }
+  }
+
+  public PdfBatchReader(PdfReaderConfig readerConfig, int maxRecords) {
+    this.maxRecords = maxRecords;
+    this.unregisteredColumnCount = 0;
+    this.writers = new ArrayList<>();
+    this.config = readerConfig;
+    this.startingTableIndex = readerConfig.plugin.getConfig().getDefaultTableIndex() < 0 ? 0 : readerConfig.plugin.getConfig().getDefaultTableIndex();
+    this.currentTableIndex = this.startingTableIndex;
+    this.columnHeaders = new ArrayList<>();
+  }
+
+  @Override
+  public boolean open(FileScanFramework.FileSchemaNegotiator negotiator) {
+    System.setProperty("java.awt.headless", "true");
+    this.negotiator = negotiator;
+
+    split = negotiator.split();
+    errorContext = negotiator.parentErrorContext();
+    builder = new SchemaBuilder();
+
+    openFile();
+
+    // Get the tables
+    tables = Utils.extractTablesFromPDF(document);
+    populateMetadata();
+    logger.debug("Found {} tables", tables.size());
+
+    // Support provided schema
+    TupleMetadata schema = null;
+    if (this.negotiator.hasProvidedSchema()) {
+      schema = this.negotiator.providedSchema();
+      this.negotiator.tableSchema(schema, false);
+    } else {
+      this.negotiator.tableSchema(buildSchema(), false);
+    }
+    ResultSetLoader loader = this.negotiator.build();
+    rowWriter = loader.writer();
+
+    if (negotiator.hasProvidedSchema()) {
+      buildWriterListFromProvidedSchema(schema);
+    } else {
+      buildWriterList();
+    }
+    addImplicitColumnsToSchema();
+
+    // Prepare for reading
+    currentRowIndex = 1;  // Skip the first line if there are headers
+    currentTable = tables.get(startingTableIndex);
+
+    return true;
+  }
+
+  @Override
+  public boolean next() {
+    System.setProperty("java.awt.headless", "true");
+
+    while(!rowWriter.isFull()) {
+      // Check to see if the limit has been reached
+      if (rowWriter.limitReached(maxRecords)) {
+        return false;
+      } else if (currentRowIndex >= currentTable.getRows().size() &&

Review comment:
       If so, I'm a bit confused by the comparison of row count and table count. Imagine each table has 100K rows. It would have to split across Drill batches. And, if each table has 3 rows, all tables should be combined in a single Drill batch.

##########
File path: contrib/format-pdf/src/main/java/org/apache/drill/exec/store/pdf/PdfFormatConfig.java
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.pdf;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonInclude.Include;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+import org.apache.drill.common.PlanStringBuilder;
+import org.apache.drill.common.logical.FormatPluginConfig;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+@JsonTypeName(PdfFormatPlugin.DEFAULT_NAME)
+public class PdfFormatConfig implements FormatPluginConfig {
+
+  private final List<String> extensions;
+  private final boolean extractHeaders;
+  private final String extractionAlgorithm;
+  private final boolean combinePages;
+  private final int defaultTableIndex;
+
+  @JsonCreator
+  public PdfFormatConfig(@JsonProperty("extensions") List<String> extensions,
+                         @JsonProperty("extractHeaders") boolean extractHeaders,
+                         @JsonProperty("extractionAlgorithm") String extractionAlgorithm,
+                         @JsonProperty("combinePages") boolean combinePages,
+                         @JsonProperty("defaultTableIndex") int defaultTableIndex) {
+    this.extensions = extensions == null
+      ? Collections.singletonList("pdf")
+      : ImmutableList.copyOf(extensions);
+    this.extractHeaders = extractHeaders;
+    this.extractionAlgorithm = extractionAlgorithm;
+    this.combinePages = combinePages;
+    this.defaultTableIndex = defaultTableIndex;
+  }
+
+  public PdfBatchReader.PdfReaderConfig getReaderConfig(PdfFormatPlugin plugin) {
+    return new PdfBatchReader.PdfReaderConfig(plugin);
+  }
+
+  @JsonInclude(Include.NON_DEFAULT)
+  @JsonProperty("extensions")
+  public List<String> getExtensions() {
+    return extensions;
+  }
+
+  @JsonInclude(Include.NON_DEFAULT)
+  @JsonProperty("combinePages")
+  public boolean getCombinePages() { return combinePages; }
+
+  @JsonInclude(Include.NON_DEFAULT)
+  @JsonProperty("extractHeaders")
+  public boolean getExtractHeaders() {
+    return extractHeaders;
+  }
+
+  @JsonProperty("extractionAlgorithm")
+  public String getExtractionAlgorithm() {
+    return extractionAlgorithm;
+  }
+
+  @JsonProperty("defaultTableIndex")
+  public int getDefaultTableIndex() { return defaultTableIndex; }

Review comment:
       Not sure I like this name. What is a `defaultTableIndex`? I'm specifying it, right? So, it's not actually a "default." It is a `tableIndex`. The default is what we get if I don't specify anything.
   
   If I don't specify anything I get... all tables? So, I can either get one table or all tables? What if I want tables 1, 5 and 20-32? Should we allow the user to specify a range, such as in the print dialog of any Windows app?
   
   And, if we did, none of these would be "default", would they?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org