You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by GitBox <gi...@apache.org> on 2021/01/09 16:49:37 UTC

[GitHub] [drill] luocooong opened a new pull request #2141: DRILL-7536: Convert Image format to EVF

luocooong opened a new pull request #2141:
URL: https://github.com/apache/drill/pull/2141


   # [DRILL-7536](https://issues.apache.org/jira/browse/DRILL-7536): Convert Image format to EVF
   
   ## Description
    Converts the Image format plugin to use EVF. The work is not yet done.
   - [x] Core module
   - [x] Code cleanup
   - [ ] Add more unit tests
   
   ## Documentation
    There are no changes visible to the user.
   
   ## Testing
   
    Ran unit tests associated with the Image plugin.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [drill] luocooong commented on pull request #2141: DRILL-7536: Convert Image format to EVF

Posted by GitBox <gi...@apache.org>.
luocooong commented on pull request #2141:
URL: https://github.com/apache/drill/pull/2141#issuecomment-761554049


   @cgivre All done. thanks again.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [drill] cgivre commented on pull request #2141: DRILL-7536: Convert Image format to EVF

Posted by GitBox <gi...@apache.org>.
cgivre commented on pull request #2141:
URL: https://github.com/apache/drill/pull/2141#issuecomment-757386502


   @luocooong 
   Thanks for the PR. Do you want code review now or should I wait for more unit tests? 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [drill] luocooong commented on pull request #2141: DRILL-7536: Convert Image format to EVF

Posted by GitBox <gi...@apache.org>.
luocooong commented on pull request #2141:
URL: https://github.com/apache/drill/pull/2141#issuecomment-757393782


   @cgivre  Good idea, I will move to the `contrib` folder once the review done. and then i think adding more unit tests will not affect the core module code.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [drill] luocooong commented on pull request #2141: DRILL-7536: Convert Image format to EVF

Posted by GitBox <gi...@apache.org>.
luocooong commented on pull request #2141:
URL: https://github.com/apache/drill/pull/2141#issuecomment-757393782


   @cgivre  Good idea, I will move to the `contrib` folder once the review done. and then i think adding more unit tests will not affect the core module code.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [drill] cgivre merged pull request #2141: DRILL-7536: Convert Image format to EVF

Posted by GitBox <gi...@apache.org>.
cgivre merged pull request #2141:
URL: https://github.com/apache/drill/pull/2141


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [drill] cgivre commented on pull request #2141: DRILL-7536: Convert Image format to EVF

Posted by GitBox <gi...@apache.org>.
cgivre commented on pull request #2141:
URL: https://github.com/apache/drill/pull/2141#issuecomment-757386502






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [drill] cgivre edited a comment on pull request #2141: DRILL-7536: Convert Image format to EVF

Posted by GitBox <gi...@apache.org>.
cgivre edited a comment on pull request #2141:
URL: https://github.com/apache/drill/pull/2141#issuecomment-757386649


   One more thing... can we move this to the `contrib` folder and include a `README`?   We can do that once we're done with code review.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [drill] cgivre edited a comment on pull request #2141: DRILL-7536: Convert Image format to EVF

Posted by GitBox <gi...@apache.org>.
cgivre edited a comment on pull request #2141:
URL: https://github.com/apache/drill/pull/2141#issuecomment-757386649


   One more thing... can we move this to the `contrib` folder and include a `README`?   We can do that once we're done with code review.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [drill] cgivre commented on a change in pull request #2141: DRILL-7536: Convert Image format to EVF

Posted by GitBox <gi...@apache.org>.
cgivre commented on a change in pull request #2141:
URL: https://github.com/apache/drill/pull/2141#discussion_r556549991



##########
File path: exec/java-exec/src/main/java/org/apache/drill/exec/store/image/ImageBatchReader.java
##########
@@ -0,0 +1,461 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.image;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.LinkedHashMap;
+
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.store.dfs.easy.EasySubScan;
+import org.apache.drill.exec.vector.accessor.ArrayWriter;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.exec.vector.accessor.TupleWriter;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.drew.imaging.FileType;
+import com.drew.imaging.FileTypeDetector;
+import com.drew.imaging.ImageMetadataReader;
+import com.drew.imaging.ImageProcessingException;
+import com.drew.metadata.Directory;
+import com.drew.metadata.Metadata;
+import com.drew.metadata.eps.EpsDirectory;
+import com.drew.metadata.exif.ExifIFD0Directory;
+import com.drew.metadata.xmp.XmpDirectory;
+
+public class ImageBatchReader implements ManagedReader<FileSchemaNegotiator> {
+
+  private static final Logger logger = LoggerFactory.getLogger(ImageBatchReader.class);

Review comment:
       For loggers, please write out the full class name here and elsewhere. 
   IE
   ```java
    org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ImageBatchReader.class);
   ```
   

##########
File path: exec/java-exec/src/main/java/org/apache/drill/exec/store/image/ImageBatchReader.java
##########
@@ -0,0 +1,461 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.image;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.LinkedHashMap;
+
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.store.dfs.easy.EasySubScan;
+import org.apache.drill.exec.vector.accessor.ArrayWriter;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.exec.vector.accessor.TupleWriter;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.drew.imaging.FileType;
+import com.drew.imaging.FileTypeDetector;
+import com.drew.imaging.ImageMetadataReader;
+import com.drew.imaging.ImageProcessingException;
+import com.drew.metadata.Directory;
+import com.drew.metadata.Metadata;
+import com.drew.metadata.eps.EpsDirectory;
+import com.drew.metadata.exif.ExifIFD0Directory;
+import com.drew.metadata.xmp.XmpDirectory;
+
+public class ImageBatchReader implements ManagedReader<FileSchemaNegotiator> {
+
+  private static final Logger logger = LoggerFactory.getLogger(ImageBatchReader.class);
+
+  private final ImageFormatConfig config;
+  private final EasySubScan scan;
+  private CustomErrorContext errorContext;
+  private Path path;
+  private FileStatus fileStatus;
+  private BufferedInputStream metaInputStream;
+  private RowSetLoader loader;
+  private LinkedHashMap<String, ColumnDefn> genericColumns;
+  private Metadata metadata;
+
+  public ImageBatchReader(final ImageFormatConfig config, final EasySubScan scan) {
+    this.config = config;
+    this.scan = scan;
+  }
+
+  @Override
+  public boolean open(FileSchemaNegotiator negotiator) {
+    try {
+      errorContext = negotiator.parentErrorContext();
+      DrillFileSystem dfs = negotiator.fileSystem();
+      path = dfs.makeQualified(negotiator.split().getPath());
+      fileStatus = dfs.getFileStatus(path);
+      metaInputStream = new BufferedInputStream(dfs.openPossiblyCompressedStream(path));
+      logger.debug("The config is {}, root is {}, columns has {}", config, scan.getSelectionRoot(), scan.getColumns());
+    } catch (IOException e) {
+      throw UserException
+              .dataReadError(e)
+              .message("Failure in initial image inputstream. " + e.getMessage())
+              .addContext(errorContext)
+              .build(logger);
+    }
+    // define the schema
+    negotiator.tableSchema(defineMetadata(), false);
+    ResultSetLoader resultSetLoader = negotiator.build();
+    loader = resultSetLoader.writer();
+    // bind the writer for generic columns
+    bindColumns(loader);
+    return true;
+  }
+
+  @Override
+  public boolean next() {
+    try {
+      loader.start();
+      // process generic metadata
+      processGenericMetadata();
+      // process external metadata
+      processExtenalMetadata();
+      loader.save();
+    } catch (IOException e) {
+      throw UserException
+              .dataReadError(e)
+              .message("Failed to estimates the file type. " + e.getMessage())
+              .addContext(errorContext)
+              .build(logger);
+    } catch (ImageProcessingException e) {
+      throw UserException
+              .dataReadError(e)
+              .message("Error in reading metadata from inputstream. " + e.getMessage())
+              .addContext(errorContext)
+              .build(logger);
+    } catch (Exception e) {
+      throw UserException
+              .dataReadError(e)
+              .message("Error in processing metadata directory. " + e.getMessage())
+              .addContext(errorContext)
+              .build(logger);
+    }
+    return false;
+  }
+
+  @Override
+  public void close() {
+    AutoCloseables.closeSilently(metaInputStream);
+  }
+
+  private TupleMetadata defineMetadata() {
+    SchemaBuilder builder = new SchemaBuilder();
+    genericColumns = new LinkedHashMap<>();
+    Collection<String> tags = GenericMetadataDirectory._tagNameMap.values();
+    for (String tagName : tags) {
+      if (!config.hasFileSystemMetadata() && ImageMetadataUtils.isSkipTag(tagName)) {
+        continue;
+      }
+      ColumnDefn columnDefn = new GenericColumnDefn(tagName);
+      if (config.isDescriptive()) {
+        columnDefn.defineText(builder);
+      } else {
+        columnDefn.define(builder);
+      }
+      genericColumns.put(ImageMetadataUtils.formatName(tagName), columnDefn);
+    }
+    Collections.unmodifiableMap(genericColumns);
+    return builder.buildSchema();
+  }
+
+  private void bindColumns(RowSetLoader loader) {
+    for (ColumnDefn columnDefn : genericColumns.values()) {
+      columnDefn.bind(loader);
+    }
+  }
+
+  private void processGenericMetadata() throws IOException, ImageProcessingException {
+    FileType fileType = FileTypeDetector.detectFileType(metaInputStream);
+    metadata = ImageMetadataReader.readMetadata(metaInputStream);
+    // Read for generic metadata at first
+    new GenericMetadataReader().read(fileType, fileStatus, metadata);
+    GenericMetadataDirectory genericMetadata = metadata.getFirstDirectoryOfType(GenericMetadataDirectory.class);
+    // Process the `Generic Metadata Directory`
+    ImageDirectoryProcessor.processGenericMetadataDirectory(genericMetadata, genericColumns, config);
+  }
+
+  private void processExtenalMetadata() {
+    boolean skipEPSPreview = false;
+    for (Directory directory : metadata.getDirectories()) {
+      // Skip the `Generic Metadata Directory`
+      String dictName = ImageMetadataUtils.formatName(directory.getName());
+      if (directory instanceof GenericMetadataDirectory) {
+        continue;
+      }
+      if (directory instanceof ExifIFD0Directory && skipEPSPreview) {
+        skipEPSPreview = false;
+        continue;
+      }
+      if (directory instanceof EpsDirectory) {
+        // If an EPS file contains a TIFF preview, skip the next IFD0
+        skipEPSPreview = directory.containsTag(EpsDirectory.TAG_TIFF_PREVIEW_SIZE);
+      }
+      // Process the `External Metadata Directory`
+      MapColumnDefn columnDefn = new MapColumnDefn(dictName).builder(loader);
+      ImageDirectoryProcessor.processDirectory(columnDefn, directory, metadata, config);
+      // Continue to process XmpDirectory if exists
+      if (directory instanceof XmpDirectory) {
+        ImageDirectoryProcessor.processXmpDirectory(columnDefn, (XmpDirectory) directory);
+      }
+    }
+  }
+
+  protected abstract static class ColumnDefn {
+
+    private final String name;
+    private final String originName; // not format
+    private ScalarWriter writer;
+
+    public ColumnDefn(String name) {
+      this.originName = name;
+      this.name = ImageMetadataUtils.formatName(name);
+    }
+
+    public String getName() {
+      return name;
+    }
+
+    public String getOriginName() {
+      return originName;
+    }
+
+    public ScalarWriter getWriter() {
+      return writer;
+    }
+
+    public void bind(RowSetLoader loader) {
+      writer = loader.scalar(name);
+    }
+
+    public void defineText(SchemaBuilder builder) {
+      builder.add(getName(), Types.optional(MinorType.VARCHAR));
+    }
+
+    public abstract void define(SchemaBuilder builder);
+
+    public abstract void load(Object value);
+
+    public ScalarWriter addText(String name) {
+      throw new UnsupportedOperationException();
+    }
+
+    public ArrayWriter addList(String name) {
+      throw new UnsupportedOperationException();
+    }
+
+    public ArrayWriter addListMap(String name) {
+      throw new UnsupportedOperationException();
+    }
+
+    public TupleWriter addMap(String name) {
+      throw new UnsupportedOperationException();
+    }
+  }
+
+  protected static class GenericColumnDefn extends ColumnDefn {
+
+    public GenericColumnDefn(String name) {
+      super(name);
+    }
+
+    @Override
+    public void define(SchemaBuilder builder) {
+      if (ImageMetadataUtils.isVarchar(getOriginName())) {
+        builder.add(getName(), Types.optional(MinorType.VARCHAR));
+      } else if (ImageMetadataUtils.isInt(getOriginName())) {
+        builder.add(getName(), Types.optional(MinorType.INT));
+      } else if (ImageMetadataUtils.isLong(getOriginName())) {
+        builder.add(getName(), Types.optional(MinorType.BIGINT));
+      } else if (ImageMetadataUtils.isDouble(getOriginName())) {
+        builder.add(getName(), Types.optional(MinorType.FLOAT8));
+      } else if (ImageMetadataUtils.isBoolean(getOriginName())) {
+        builder.add(getName(), Types.optional(MinorType.BIT));
+      } else if (ImageMetadataUtils.isDate(getOriginName())) {
+        builder.add(getName(), Types.optional(MinorType.TIMESTAMP));
+      }
+    }
+
+    @Override
+    public void load(Object value) {
+      if (value instanceof Date) {
+        getWriter().setTimestamp(Instant.ofEpochMilli(((Date) value).getTime()));

Review comment:
       Can we add this to the writer object? 

##########
File path: exec/java-exec/src/main/java/org/apache/drill/exec/store/image/ImageFormatConfig.java
##########
@@ -15,59 +15,58 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.drill.exec.store.image;
 
 import java.util.List;
 import java.util.Objects;
 
 import org.apache.drill.common.PlanStringBuilder;
 import org.apache.drill.common.logical.FormatPluginConfig;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonInclude.Include;
 import com.fasterxml.jackson.annotation.JsonTypeName;
-import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
 
-@JsonTypeName("image") @JsonInclude(Include.NON_DEFAULT)
+@JsonTypeName(ImageFormatConfig.NAME)
+@JsonInclude(JsonInclude.Include.NON_DEFAULT)
 public class ImageFormatConfig implements FormatPluginConfig {
 
+  public static final String NAME = "image";
   private final List<String> extensions;
   private final boolean fileSystemMetadata;
   private final boolean descriptive;
   private final String timeZone;
 
-  public ImageFormatConfig() {
-    this(null, null, null, null);
-  }
-
   @JsonCreator
   public ImageFormatConfig(
       @JsonProperty("extensions") List<String> extensions,
       @JsonProperty("fileSystemMetadata") Boolean fileSystemMetadata,
       @JsonProperty("descriptive") Boolean descriptive,
       @JsonProperty("timeZone") String timeZone) {
-    this.extensions = extensions == null ?
-        ImmutableList.of() : ImmutableList.copyOf(extensions);
+    this.extensions = extensions == null ? ImmutableList.of() : ImmutableList.copyOf(extensions);
     this.fileSystemMetadata = fileSystemMetadata == null ? true : fileSystemMetadata;

Review comment:
       This can be simplified to:
   ```
   this.fileSystemMetadata = fileSystemMetadata == null || fileSystemMetadata;
   ```

##########
File path: exec/java-exec/src/test/java/org/apache/drill/exec/store/image/TestImageBatchReader.java
##########
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.image;
+
+import static org.apache.drill.test.rowSet.RowSetUtilities.mapArray;
+import static org.apache.drill.test.rowSet.RowSetUtilities.mapValue;
+import static org.apache.drill.test.rowSet.RowSetUtilities.singleMap;
+import static org.apache.drill.test.rowSet.RowSetUtilities.strArray;
+import static org.junit.Assert.assertEquals;
+
+import java.nio.file.Paths;
+import java.util.Arrays;
+
+import org.apache.drill.categories.RowSetTests;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.rowSet.RowSet;
+import org.apache.drill.exec.physical.rowSet.RowSetBuilder;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterTest;
+import org.apache.drill.test.QueryBuilder;
+import org.apache.drill.test.QueryTestUtil;
+import org.apache.drill.test.rowSet.RowSetComparison;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(RowSetTests.class)
+public class TestImageBatchReader extends ClusterTest {
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    ClusterTest.startCluster(ClusterFixture.builder(dirTestWatcher));
+    cluster.defineFormat("dfs", "image", new ImageFormatConfig(Arrays.asList("bmp", "jpg", "mp4"), false, false, null));

Review comment:
       Can we add some tests to test out the various configuration options?

##########
File path: exec/java-exec/src/main/java/org/apache/drill/exec/store/image/ImageBatchReader.java
##########
@@ -0,0 +1,461 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.image;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.LinkedHashMap;
+
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.store.dfs.easy.EasySubScan;
+import org.apache.drill.exec.vector.accessor.ArrayWriter;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.exec.vector.accessor.TupleWriter;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.drew.imaging.FileType;
+import com.drew.imaging.FileTypeDetector;
+import com.drew.imaging.ImageMetadataReader;
+import com.drew.imaging.ImageProcessingException;
+import com.drew.metadata.Directory;
+import com.drew.metadata.Metadata;
+import com.drew.metadata.eps.EpsDirectory;
+import com.drew.metadata.exif.ExifIFD0Directory;
+import com.drew.metadata.xmp.XmpDirectory;
+
+public class ImageBatchReader implements ManagedReader<FileSchemaNegotiator> {
+
+  private static final Logger logger = LoggerFactory.getLogger(ImageBatchReader.class);
+
+  private final ImageFormatConfig config;
+  private final EasySubScan scan;
+  private CustomErrorContext errorContext;
+  private Path path;
+  private FileStatus fileStatus;
+  private BufferedInputStream metaInputStream;
+  private RowSetLoader loader;
+  private LinkedHashMap<String, ColumnDefn> genericColumns;
+  private Metadata metadata;
+
+  public ImageBatchReader(final ImageFormatConfig config, final EasySubScan scan) {
+    this.config = config;
+    this.scan = scan;
+  }
+
+  @Override
+  public boolean open(FileSchemaNegotiator negotiator) {
+    try {
+      errorContext = negotiator.parentErrorContext();
+      DrillFileSystem dfs = negotiator.fileSystem();
+      path = dfs.makeQualified(negotiator.split().getPath());
+      fileStatus = dfs.getFileStatus(path);
+      metaInputStream = new BufferedInputStream(dfs.openPossiblyCompressedStream(path));
+      logger.debug("The config is {}, root is {}, columns has {}", config, scan.getSelectionRoot(), scan.getColumns());
+    } catch (IOException e) {
+      throw UserException
+              .dataReadError(e)
+              .message("Failure in initial image inputstream. " + e.getMessage())
+              .addContext(errorContext)
+              .build(logger);
+    }
+    // define the schema
+    negotiator.tableSchema(defineMetadata(), false);
+    ResultSetLoader resultSetLoader = negotiator.build();
+    loader = resultSetLoader.writer();
+    // bind the writer for generic columns
+    bindColumns(loader);
+    return true;
+  }
+
+  @Override
+  public boolean next() {
+    try {
+      loader.start();
+      // process generic metadata
+      processGenericMetadata();
+      // process external metadata
+      processExtenalMetadata();
+      loader.save();
+    } catch (IOException e) {
+      throw UserException
+              .dataReadError(e)
+              .message("Failed to estimates the file type. " + e.getMessage())
+              .addContext(errorContext)
+              .build(logger);
+    } catch (ImageProcessingException e) {
+      throw UserException
+              .dataReadError(e)
+              .message("Error in reading metadata from inputstream. " + e.getMessage())
+              .addContext(errorContext)
+              .build(logger);
+    } catch (Exception e) {
+      throw UserException
+              .dataReadError(e)
+              .message("Error in processing metadata directory. " + e.getMessage())
+              .addContext(errorContext)
+              .build(logger);
+    }
+    return false;
+  }
+
+  @Override
+  public void close() {
+    AutoCloseables.closeSilently(metaInputStream);
+  }
+
+  private TupleMetadata defineMetadata() {
+    SchemaBuilder builder = new SchemaBuilder();
+    genericColumns = new LinkedHashMap<>();
+    Collection<String> tags = GenericMetadataDirectory._tagNameMap.values();
+    for (String tagName : tags) {
+      if (!config.hasFileSystemMetadata() && ImageMetadataUtils.isSkipTag(tagName)) {
+        continue;
+      }
+      ColumnDefn columnDefn = new GenericColumnDefn(tagName);
+      if (config.isDescriptive()) {
+        columnDefn.defineText(builder);
+      } else {
+        columnDefn.define(builder);
+      }
+      genericColumns.put(ImageMetadataUtils.formatName(tagName), columnDefn);
+    }
+    Collections.unmodifiableMap(genericColumns);
+    return builder.buildSchema();
+  }
+
+  private void bindColumns(RowSetLoader loader) {
+    for (ColumnDefn columnDefn : genericColumns.values()) {
+      columnDefn.bind(loader);
+    }
+  }
+
+  private void processGenericMetadata() throws IOException, ImageProcessingException {
+    FileType fileType = FileTypeDetector.detectFileType(metaInputStream);
+    metadata = ImageMetadataReader.readMetadata(metaInputStream);
+    // Read for generic metadata at first
+    new GenericMetadataReader().read(fileType, fileStatus, metadata);
+    GenericMetadataDirectory genericMetadata = metadata.getFirstDirectoryOfType(GenericMetadataDirectory.class);
+    // Process the `Generic Metadata Directory`
+    ImageDirectoryProcessor.processGenericMetadataDirectory(genericMetadata, genericColumns, config);
+  }
+
+  private void processExtenalMetadata() {
+    boolean skipEPSPreview = false;
+    for (Directory directory : metadata.getDirectories()) {
+      // Skip the `Generic Metadata Directory`
+      String dictName = ImageMetadataUtils.formatName(directory.getName());
+      if (directory instanceof GenericMetadataDirectory) {
+        continue;
+      }
+      if (directory instanceof ExifIFD0Directory && skipEPSPreview) {
+        skipEPSPreview = false;
+        continue;
+      }
+      if (directory instanceof EpsDirectory) {
+        // If an EPS file contains a TIFF preview, skip the next IFD0
+        skipEPSPreview = directory.containsTag(EpsDirectory.TAG_TIFF_PREVIEW_SIZE);
+      }
+      // Process the `External Metadata Directory`
+      MapColumnDefn columnDefn = new MapColumnDefn(dictName).builder(loader);
+      ImageDirectoryProcessor.processDirectory(columnDefn, directory, metadata, config);
+      // Continue to process XmpDirectory if exists
+      if (directory instanceof XmpDirectory) {
+        ImageDirectoryProcessor.processXmpDirectory(columnDefn, (XmpDirectory) directory);
+      }
+    }
+  }
+
+  protected abstract static class ColumnDefn {

Review comment:
       I wonder if we could add these abstract classes to the `EasyFormatPlugin` classes so that they are usable for future storage plugins.  
   
   If you take a look here: https://github.com/apache/drill/pull/1962
   
   I was working on refactoring them a bit to reduce the cut/paste code.
   

##########
File path: exec/java-exec/src/main/java/org/apache/drill/exec/store/image/ImageBatchReader.java
##########
@@ -0,0 +1,461 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.image;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.LinkedHashMap;
+
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.store.dfs.easy.EasySubScan;
+import org.apache.drill.exec.vector.accessor.ArrayWriter;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.exec.vector.accessor.TupleWriter;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.drew.imaging.FileType;
+import com.drew.imaging.FileTypeDetector;
+import com.drew.imaging.ImageMetadataReader;
+import com.drew.imaging.ImageProcessingException;
+import com.drew.metadata.Directory;
+import com.drew.metadata.Metadata;
+import com.drew.metadata.eps.EpsDirectory;
+import com.drew.metadata.exif.ExifIFD0Directory;
+import com.drew.metadata.xmp.XmpDirectory;
+
+public class ImageBatchReader implements ManagedReader<FileSchemaNegotiator> {
+
+  private static final Logger logger = LoggerFactory.getLogger(ImageBatchReader.class);
+
+  private final ImageFormatConfig config;
+  private final EasySubScan scan;
+  private CustomErrorContext errorContext;
+  private Path path;
+  private FileStatus fileStatus;
+  private BufferedInputStream metaInputStream;
+  private RowSetLoader loader;
+  private LinkedHashMap<String, ColumnDefn> genericColumns;
+  private Metadata metadata;
+
+  public ImageBatchReader(final ImageFormatConfig config, final EasySubScan scan) {
+    this.config = config;
+    this.scan = scan;
+  }
+
+  @Override
+  public boolean open(FileSchemaNegotiator negotiator) {
+    try {
+      errorContext = negotiator.parentErrorContext();
+      DrillFileSystem dfs = negotiator.fileSystem();
+      path = dfs.makeQualified(negotiator.split().getPath());
+      fileStatus = dfs.getFileStatus(path);
+      metaInputStream = new BufferedInputStream(dfs.openPossiblyCompressedStream(path));
+      logger.debug("The config is {}, root is {}, columns has {}", config, scan.getSelectionRoot(), scan.getColumns());
+    } catch (IOException e) {
+      throw UserException
+              .dataReadError(e)
+              .message("Failure in initial image inputstream. " + e.getMessage())
+              .addContext(errorContext)
+              .build(logger);
+    }
+    // define the schema
+    negotiator.tableSchema(defineMetadata(), false);
+    ResultSetLoader resultSetLoader = negotiator.build();
+    loader = resultSetLoader.writer();
+    // bind the writer for generic columns
+    bindColumns(loader);
+    return true;
+  }
+
+  @Override
+  public boolean next() {
+    try {
+      loader.start();
+      // process generic metadata
+      processGenericMetadata();
+      // process external metadata
+      processExtenalMetadata();
+      loader.save();
+    } catch (IOException e) {
+      throw UserException
+              .dataReadError(e)
+              .message("Failed to estimates the file type. " + e.getMessage())
+              .addContext(errorContext)
+              .build(logger);
+    } catch (ImageProcessingException e) {
+      throw UserException
+              .dataReadError(e)
+              .message("Error in reading metadata from inputstream. " + e.getMessage())
+              .addContext(errorContext)
+              .build(logger);
+    } catch (Exception e) {
+      throw UserException
+              .dataReadError(e)
+              .message("Error in processing metadata directory. " + e.getMessage())
+              .addContext(errorContext)
+              .build(logger);
+    }
+    return false;
+  }
+
+  @Override
+  public void close() {
+    AutoCloseables.closeSilently(metaInputStream);
+  }
+
+  private TupleMetadata defineMetadata() {
+    SchemaBuilder builder = new SchemaBuilder();
+    genericColumns = new LinkedHashMap<>();
+    Collection<String> tags = GenericMetadataDirectory._tagNameMap.values();
+    for (String tagName : tags) {
+      if (!config.hasFileSystemMetadata() && ImageMetadataUtils.isSkipTag(tagName)) {
+        continue;
+      }
+      ColumnDefn columnDefn = new GenericColumnDefn(tagName);
+      if (config.isDescriptive()) {
+        columnDefn.defineText(builder);
+      } else {
+        columnDefn.define(builder);
+      }
+      genericColumns.put(ImageMetadataUtils.formatName(tagName), columnDefn);
+    }
+    Collections.unmodifiableMap(genericColumns);
+    return builder.buildSchema();
+  }
+
+  private void bindColumns(RowSetLoader loader) {
+    for (ColumnDefn columnDefn : genericColumns.values()) {
+      columnDefn.bind(loader);
+    }
+  }
+
+  private void processGenericMetadata() throws IOException, ImageProcessingException {
+    FileType fileType = FileTypeDetector.detectFileType(metaInputStream);
+    metadata = ImageMetadataReader.readMetadata(metaInputStream);
+    // Read for generic metadata at first
+    new GenericMetadataReader().read(fileType, fileStatus, metadata);
+    GenericMetadataDirectory genericMetadata = metadata.getFirstDirectoryOfType(GenericMetadataDirectory.class);
+    // Process the `Generic Metadata Directory`
+    ImageDirectoryProcessor.processGenericMetadataDirectory(genericMetadata, genericColumns, config);
+  }
+
+  private void processExtenalMetadata() {
+    boolean skipEPSPreview = false;
+    for (Directory directory : metadata.getDirectories()) {
+      // Skip the `Generic Metadata Directory`
+      String dictName = ImageMetadataUtils.formatName(directory.getName());
+      if (directory instanceof GenericMetadataDirectory) {
+        continue;
+      }
+      if (directory instanceof ExifIFD0Directory && skipEPSPreview) {
+        skipEPSPreview = false;
+        continue;
+      }
+      if (directory instanceof EpsDirectory) {
+        // If an EPS file contains a TIFF preview, skip the next IFD0
+        skipEPSPreview = directory.containsTag(EpsDirectory.TAG_TIFF_PREVIEW_SIZE);
+      }
+      // Process the `External Metadata Directory`
+      MapColumnDefn columnDefn = new MapColumnDefn(dictName).builder(loader);
+      ImageDirectoryProcessor.processDirectory(columnDefn, directory, metadata, config);
+      // Continue to process XmpDirectory if exists
+      if (directory instanceof XmpDirectory) {
+        ImageDirectoryProcessor.processXmpDirectory(columnDefn, (XmpDirectory) directory);
+      }
+    }
+  }
+
+  protected abstract static class ColumnDefn {
+
+    private final String name;
+    private final String originName; // not format
+    private ScalarWriter writer;
+
+    public ColumnDefn(String name) {
+      this.originName = name;
+      this.name = ImageMetadataUtils.formatName(name);
+    }
+
+    public String getName() {
+      return name;
+    }
+
+    public String getOriginName() {
+      return originName;
+    }
+
+    public ScalarWriter getWriter() {
+      return writer;
+    }
+
+    public void bind(RowSetLoader loader) {
+      writer = loader.scalar(name);
+    }
+
+    public void defineText(SchemaBuilder builder) {
+      builder.add(getName(), Types.optional(MinorType.VARCHAR));
+    }
+
+    public abstract void define(SchemaBuilder builder);
+
+    public abstract void load(Object value);
+
+    public ScalarWriter addText(String name) {
+      throw new UnsupportedOperationException();
+    }
+
+    public ArrayWriter addList(String name) {
+      throw new UnsupportedOperationException();
+    }
+
+    public ArrayWriter addListMap(String name) {
+      throw new UnsupportedOperationException();
+    }
+
+    public TupleWriter addMap(String name) {
+      throw new UnsupportedOperationException();
+    }
+  }
+
+  protected static class GenericColumnDefn extends ColumnDefn {
+
+    public GenericColumnDefn(String name) {
+      super(name);
+    }
+
+    @Override
+    public void define(SchemaBuilder builder) {
+      if (ImageMetadataUtils.isVarchar(getOriginName())) {
+        builder.add(getName(), Types.optional(MinorType.VARCHAR));
+      } else if (ImageMetadataUtils.isInt(getOriginName())) {
+        builder.add(getName(), Types.optional(MinorType.INT));
+      } else if (ImageMetadataUtils.isLong(getOriginName())) {
+        builder.add(getName(), Types.optional(MinorType.BIGINT));
+      } else if (ImageMetadataUtils.isDouble(getOriginName())) {
+        builder.add(getName(), Types.optional(MinorType.FLOAT8));
+      } else if (ImageMetadataUtils.isBoolean(getOriginName())) {
+        builder.add(getName(), Types.optional(MinorType.BIT));
+      } else if (ImageMetadataUtils.isDate(getOriginName())) {
+        builder.add(getName(), Types.optional(MinorType.TIMESTAMP));
+      }
+    }
+
+    @Override
+    public void load(Object value) {
+      if (value instanceof Date) {
+        getWriter().setTimestamp(Instant.ofEpochMilli(((Date) value).getTime()));
+      } else {
+        getWriter().setObject(value);
+      }
+    }
+  }
+
+  protected static class MapColumnDefn extends ColumnDefn {
+
+    private int index;
+    private TupleWriter writer;
+
+    public MapColumnDefn(String name) {
+      super(name);
+    }
+
+    @Override
+    public void bind(RowSetLoader loader) {
+      index = loader.tupleSchema().index(getName());
+      if (index == -1) {
+        index = loader.addColumn(SchemaBuilder.columnSchema(getName(), MinorType.MAP, DataMode.REQUIRED));
+      }
+      writer = loader.tuple(index);
+    }
+
+    @Override
+    public void define(SchemaBuilder builder) { }
+
+    @Override
+    public void load(Object value) { }
+
+    public MapColumnDefn builder(RowSetLoader loader) {
+      bind(loader);
+      return this;
+    }
+
+    public MapColumnDefn builder(TupleWriter writer) {
+      this.writer = writer;
+      return this;
+    }
+
+    /**
+     * example : { a : 1 } > { a : 1, b : "2" }
+     */
+    @Override
+    public ScalarWriter addText(String name) {
+      index = writer.tupleSchema().index(name);
+      if (index == -1) {
+        index = writer.addColumn(SchemaBuilder.columnSchema(name, MinorType.VARCHAR, DataMode.OPTIONAL));
+      } else { // rewrite the value
+        writer.column(name).events().restartRow();
+      }
+      return writer.scalar(index);
+    }
+
+    /**
+     * example : { a : 1 } > { a : 1, [ b : "2" ] }
+     */
+    @Override
+    public ArrayWriter addList(String name) {
+      index = writer.tupleSchema().index(name);
+      if (index == -1) {
+        index = writer.addColumn(SchemaBuilder.columnSchema(name, MinorType.VARCHAR, DataMode.REPEATED));
+      }
+      return writer.array(index);
+    }
+
+    /**
+     * example : { a : 1 } > { a : 1, [ { b : 2 } ] }
+     */
+    @Override
+    public ArrayWriter addListMap(String name) {
+      index = writer.tupleSchema().index(name);
+      if (index == -1) {
+        index = writer.addColumn(SchemaBuilder.columnSchema(name, MinorType.MAP, DataMode.REPEATED));
+      }
+      return writer.array(index);
+    }
+
+    /**
+     * example : { a : 1 } > { a : 1, { b : 2 } }
+     */
+    @Override
+    public TupleWriter addMap(String name) {
+      index = writer.tupleSchema().index(name);
+      if (index == -1) {
+        index = writer.addColumn(SchemaBuilder.columnSchema(name, MinorType.MAP, DataMode.REQUIRED));
+        return writer.tuple(index);
+      }
+      return writer.tuple(name);
+    }
+
+    /**
+     * example : { a : 1 } > { a : 1, [ 0, -1, 0, -1 ] }
+     */
+    public ArrayWriter addListByte(String name) {
+      index = writer.tupleSchema().index(name);
+      if (index == -1) {
+        index = writer.addColumn(SchemaBuilder.columnSchema(name, MinorType.TINYINT, DataMode.REPEATED));
+      }
+      return writer.array(index);
+    }
+
+    /**
+     * example : { a : 1 } > { a : 1, b : 2.0 }
+     */
+    public ScalarWriter addDouble(String name) {
+      index = writer.tupleSchema().index(name);
+      if (index == -1) {
+        index = writer.addColumn(SchemaBuilder.columnSchema(name, MinorType.FLOAT8, DataMode.OPTIONAL));
+      }
+      return writer.scalar(index);
+    }
+
+    /**
+     * example : { a : 1 } > { a : 1, b : date() }
+     */
+    public ScalarWriter addDate(String name) {
+      index = writer.tupleSchema().index(name);
+      if (index == -1) {
+        index = writer.addColumn(SchemaBuilder.columnSchema(name, MinorType.TIMESTAMP, DataMode.OPTIONAL));
+      }
+      return writer.scalar(index);
+    }
+
+    /**
+     * example : { a : 1 } > { a : 1, b : object() }
+     */
+    public ScalarWriter addObject(String name, MinorType type) {
+      index = writer.tupleSchema().index(name);
+      if (index == -1) {
+        index = writer.addColumn(SchemaBuilder.columnSchema(name, type, DataMode.OPTIONAL));
+      }
+      return writer.scalar(index);
+    }
+
+    /**
+     * example : { a : 1 } > { a : 1, b : 2 }
+     */
+    public ScalarWriter addIntToMap(TupleWriter writer, String name) {
+      int index = writer.tupleSchema().index(name);
+      if (index == -1) {
+        index = writer.addColumn(SchemaBuilder.columnSchema(name, MinorType.INT, DataMode.OPTIONAL));
+      }
+      return writer.scalar(index);
+    }
+  }
+
+  protected static class ListColumnDefn extends ColumnDefn {
+
+    private int index;
+    private ArrayWriter writer;
+
+    public ListColumnDefn(String name) {
+      super(name);
+    }
+
+    @Override
+    public void define(SchemaBuilder builder) { }
+
+    @Override
+    public void load(Object value) { }
+
+    public ListColumnDefn builder(ArrayWriter writer) {
+      this.writer = writer;
+      return this;
+    }
+
+    /**
+     * example : [ ] > [ { a : "1" } ]
+     */
+    @Override
+    public ScalarWriter addText(String name) {
+      TupleWriter map = writer.tuple();
+      index = map.tupleSchema().index(name);

Review comment:
       `index` can be local to this method.

##########
File path: exec/java-exec/src/main/java/org/apache/drill/exec/store/image/ImageDirectoryProcessor.java
##########
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.image;
+
+import java.util.Date;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.TimeZone;
+
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.store.image.ImageBatchReader.ColumnDefn;
+import org.apache.drill.exec.store.image.ImageBatchReader.ListColumnDefn;
+import org.apache.drill.exec.store.image.ImageBatchReader.MapColumnDefn;
+import org.apache.drill.exec.vector.accessor.ArrayWriter;
+import org.apache.drill.exec.vector.accessor.ColumnWriter;
+import org.apache.drill.exec.vector.accessor.TupleWriter;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.adobe.internal.xmp.XMPException;
+import com.adobe.internal.xmp.XMPIterator;
+import com.adobe.internal.xmp.XMPMeta;
+import com.adobe.internal.xmp.options.IteratorOptions;
+import com.adobe.internal.xmp.properties.XMPPropertyInfo;
+import com.drew.lang.KeyValuePair;
+import com.drew.lang.Rational;
+import com.drew.metadata.Directory;
+import com.drew.metadata.Metadata;
+import com.drew.metadata.StringValue;
+import com.drew.metadata.Tag;
+import com.drew.metadata.exif.ExifIFD0Directory;
+import com.drew.metadata.exif.ExifSubIFDDirectory;
+import com.drew.metadata.exif.GpsDirectory;
+import com.drew.metadata.jpeg.JpegComponent;
+import com.drew.metadata.png.PngDirectory;
+import com.drew.metadata.xmp.XmpDirectory;
+
+public class ImageDirectoryProcessor {

Review comment:
       Can you explain what this class does exactly?  

##########
File path: exec/java-exec/src/main/java/org/apache/drill/exec/store/image/ImageMetadataUtils.java
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.image;
+
+import java.util.HashMap;
+
+import com.drew.metadata.Directory;
+import com.drew.metadata.exif.ExifInteropDirectory;
+import com.drew.metadata.exif.ExifSubIFDDirectory;
+import com.drew.metadata.exif.PanasonicRawIFD0Directory;
+import com.drew.metadata.exif.makernotes.FujifilmMakernoteDirectory;
+import com.drew.metadata.exif.makernotes.NikonType2MakernoteDirectory;
+import com.drew.metadata.exif.makernotes.OlympusCameraSettingsMakernoteDirectory;
+import com.drew.metadata.exif.makernotes.OlympusEquipmentMakernoteDirectory;
+import com.drew.metadata.exif.makernotes.OlympusFocusInfoMakernoteDirectory;
+import com.drew.metadata.exif.makernotes.OlympusImageProcessingMakernoteDirectory;
+import com.drew.metadata.exif.makernotes.OlympusMakernoteDirectory;
+import com.drew.metadata.exif.makernotes.OlympusRawDevelopment2MakernoteDirectory;
+import com.drew.metadata.exif.makernotes.OlympusRawDevelopmentMakernoteDirectory;
+import com.drew.metadata.exif.makernotes.OlympusRawInfoMakernoteDirectory;
+import com.drew.metadata.exif.makernotes.PanasonicMakernoteDirectory;
+import com.drew.metadata.exif.makernotes.SamsungType2MakernoteDirectory;
+import com.drew.metadata.exif.makernotes.SonyType6MakernoteDirectory;
+import com.drew.metadata.icc.IccDirectory;
+import com.drew.metadata.photoshop.PhotoshopDirectory;
+import com.drew.metadata.png.PngDirectory;
+
+public class ImageMetadataUtils {
+
+  public static boolean isVarchar(String name) {
+    HashMap<Integer, String> tags = GenericMetadataDirectory._tagNameMap;
+    // Format,Color Mode,Video Codec,Audio Codec
+    if (name.equals(tags.get(GenericMetadataDirectory.TAG_FORMAT))
+        || name.equals(tags.get(GenericMetadataDirectory.TAG_COLOR_MODE))
+        || name.equals(tags.get(GenericMetadataDirectory.TAG_VIDEO_CODEC))
+        || name.equals(tags.get(GenericMetadataDirectory.TAG_AUDIO_CODEC))) {
+      return true;
+    }
+    return false;
+  }
+
+  public static boolean isInt(String name) {
+    HashMap<Integer, String> tags = GenericMetadataDirectory._tagNameMap;
+    // Pixel Width,Pixel Height,Orientation,Bits Per Pixel,Audio Sample Size
+    if (name.equals(tags.get(GenericMetadataDirectory.TAG_PIXEL_WIDTH))
+        || name.equals(tags.get(GenericMetadataDirectory.TAG_PIXEL_HEIGHT))
+        || name.equals(tags.get(GenericMetadataDirectory.TAG_ORIENTATION))
+        || name.equals(tags.get(GenericMetadataDirectory.TAG_BITS_PER_PIXEL))
+        || name.equals(tags.get(GenericMetadataDirectory.TAG_AUDIO_SAMPLE_SIZE))) {
+      return true;
+    }
+    return false;
+  }
+
+  public static boolean isLong(String name) {
+    HashMap<Integer, String> tags = GenericMetadataDirectory._tagNameMap;
+    // File Size,Duration
+    if (name.equals(tags.get(GenericMetadataDirectory.TAG_FILE_SIZE))
+        || name.equals(tags.get(GenericMetadataDirectory.TAG_DURATION))) {
+      return true;
+    }
+    return false;
+  }
+
+  public static boolean isDouble(String name) {
+    HashMap<Integer, String> tags = GenericMetadataDirectory._tagNameMap;
+    // DPI Width,DPI Height,Frame Rate,Audio Sample Rate
+    if (name.equals(tags.get(GenericMetadataDirectory.TAG_DPI_WIDTH))
+        || name.equals(tags.get(GenericMetadataDirectory.TAG_DPI_HEIGHT))
+        || name.equals(tags.get(GenericMetadataDirectory.TAG_FRAME_RATE))
+        || name.equals(tags.get(GenericMetadataDirectory.TAG_AUDIO_SAMPLE_RATE))) {
+      return true;
+    }
+    return false;
+  }
+
+  public static boolean isBoolean(String name) {
+    HashMap<Integer, String> tags = GenericMetadataDirectory._tagNameMap;
+    // Has Alpha
+    if (name.equals(tags.get(GenericMetadataDirectory.TAG_HAS_ALPHA))) {
+      return true;
+    }
+    return false;
+  }
+
+  public static boolean isDate(String name) {
+    HashMap<Integer, String> tags = GenericMetadataDirectory._tagNameMap;
+    // File Date Time
+    if (name.equals(tags.get(GenericMetadataDirectory.TAG_FILE_DATE_TIME))) {
+      return true;
+    }
+    return false;
+  }
+
+  /**
+   * Format the tag name (remove the spaces and special characters)
+   * @param tagName
+   * @return
+   */
+  public static String formatName(final String tagName) {
+    StringBuilder builder = new StringBuilder();
+    boolean upperCase = true;
+    for (char c : tagName.toCharArray()) {
+      if (c == ' ' || c == '-' || c == '/') {
+        upperCase = true;
+      } else {
+        builder.append(upperCase ? Character.toUpperCase(c) : c);
+        upperCase = false;
+      }
+    }
+    return builder.toString();
+  }
+
+  /**
+   * Skip the tag if parameter (fileSystemMetadata) is false
+   * @param name
+   * @return
+   */
+  public static boolean isSkipTag(String name) {
+    HashMap<Integer, String> tags = GenericMetadataDirectory._tagNameMap;
+    if (name.equals(tags.get(GenericMetadataDirectory.TAG_FILE_SIZE))

Review comment:
       I think you can remove these `if` statements. IE:
   ```
   // Instead of 
   if (x || y )
   
   // Just have 
   return x || y
   ```
   Here and elsewhere in this class.

##########
File path: exec/java-exec/src/main/java/org/apache/drill/exec/store/image/ImageBatchReader.java
##########
@@ -0,0 +1,461 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.image;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.LinkedHashMap;
+
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.store.dfs.easy.EasySubScan;
+import org.apache.drill.exec.vector.accessor.ArrayWriter;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.exec.vector.accessor.TupleWriter;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.drew.imaging.FileType;
+import com.drew.imaging.FileTypeDetector;
+import com.drew.imaging.ImageMetadataReader;
+import com.drew.imaging.ImageProcessingException;
+import com.drew.metadata.Directory;
+import com.drew.metadata.Metadata;
+import com.drew.metadata.eps.EpsDirectory;
+import com.drew.metadata.exif.ExifIFD0Directory;
+import com.drew.metadata.xmp.XmpDirectory;
+
+public class ImageBatchReader implements ManagedReader<FileSchemaNegotiator> {
+
+  private static final Logger logger = LoggerFactory.getLogger(ImageBatchReader.class);
+
+  private final ImageFormatConfig config;
+  private final EasySubScan scan;
+  private CustomErrorContext errorContext;
+  private Path path;
+  private FileStatus fileStatus;
+  private BufferedInputStream metaInputStream;
+  private RowSetLoader loader;
+  private LinkedHashMap<String, ColumnDefn> genericColumns;
+  private Metadata metadata;
+
+  public ImageBatchReader(final ImageFormatConfig config, final EasySubScan scan) {
+    this.config = config;
+    this.scan = scan;
+  }
+
+  @Override
+  public boolean open(FileSchemaNegotiator negotiator) {
+    try {
+      errorContext = negotiator.parentErrorContext();
+      DrillFileSystem dfs = negotiator.fileSystem();
+      path = dfs.makeQualified(negotiator.split().getPath());
+      fileStatus = dfs.getFileStatus(path);
+      metaInputStream = new BufferedInputStream(dfs.openPossiblyCompressedStream(path));
+      logger.debug("The config is {}, root is {}, columns has {}", config, scan.getSelectionRoot(), scan.getColumns());
+    } catch (IOException e) {
+      throw UserException
+              .dataReadError(e)
+              .message("Failure in initial image inputstream. " + e.getMessage())
+              .addContext(errorContext)
+              .build(logger);
+    }
+    // define the schema
+    negotiator.tableSchema(defineMetadata(), false);
+    ResultSetLoader resultSetLoader = negotiator.build();
+    loader = resultSetLoader.writer();
+    // bind the writer for generic columns
+    bindColumns(loader);
+    return true;
+  }
+
+  @Override
+  public boolean next() {
+    try {
+      loader.start();
+      // process generic metadata
+      processGenericMetadata();
+      // process external metadata
+      processExtenalMetadata();
+      loader.save();
+    } catch (IOException e) {
+      throw UserException
+              .dataReadError(e)
+              .message("Failed to estimates the file type. " + e.getMessage())
+              .addContext(errorContext)
+              .build(logger);
+    } catch (ImageProcessingException e) {
+      throw UserException
+              .dataReadError(e)
+              .message("Error in reading metadata from inputstream. " + e.getMessage())
+              .addContext(errorContext)
+              .build(logger);
+    } catch (Exception e) {
+      throw UserException
+              .dataReadError(e)
+              .message("Error in processing metadata directory. " + e.getMessage())
+              .addContext(errorContext)
+              .build(logger);
+    }
+    return false;
+  }
+
+  @Override
+  public void close() {
+    AutoCloseables.closeSilently(metaInputStream);
+  }
+
+  private TupleMetadata defineMetadata() {
+    SchemaBuilder builder = new SchemaBuilder();
+    genericColumns = new LinkedHashMap<>();
+    Collection<String> tags = GenericMetadataDirectory._tagNameMap.values();
+    for (String tagName : tags) {
+      if (!config.hasFileSystemMetadata() && ImageMetadataUtils.isSkipTag(tagName)) {
+        continue;
+      }
+      ColumnDefn columnDefn = new GenericColumnDefn(tagName);
+      if (config.isDescriptive()) {
+        columnDefn.defineText(builder);
+      } else {
+        columnDefn.define(builder);
+      }
+      genericColumns.put(ImageMetadataUtils.formatName(tagName), columnDefn);
+    }
+    Collections.unmodifiableMap(genericColumns);
+    return builder.buildSchema();
+  }
+
+  private void bindColumns(RowSetLoader loader) {
+    for (ColumnDefn columnDefn : genericColumns.values()) {
+      columnDefn.bind(loader);
+    }
+  }
+
+  private void processGenericMetadata() throws IOException, ImageProcessingException {
+    FileType fileType = FileTypeDetector.detectFileType(metaInputStream);
+    metadata = ImageMetadataReader.readMetadata(metaInputStream);
+    // Read for generic metadata at first
+    new GenericMetadataReader().read(fileType, fileStatus, metadata);
+    GenericMetadataDirectory genericMetadata = metadata.getFirstDirectoryOfType(GenericMetadataDirectory.class);
+    // Process the `Generic Metadata Directory`
+    ImageDirectoryProcessor.processGenericMetadataDirectory(genericMetadata, genericColumns, config);
+  }
+
+  private void processExtenalMetadata() {
+    boolean skipEPSPreview = false;
+    for (Directory directory : metadata.getDirectories()) {
+      // Skip the `Generic Metadata Directory`
+      String dictName = ImageMetadataUtils.formatName(directory.getName());
+      if (directory instanceof GenericMetadataDirectory) {
+        continue;
+      }
+      if (directory instanceof ExifIFD0Directory && skipEPSPreview) {
+        skipEPSPreview = false;
+        continue;
+      }
+      if (directory instanceof EpsDirectory) {
+        // If an EPS file contains a TIFF preview, skip the next IFD0
+        skipEPSPreview = directory.containsTag(EpsDirectory.TAG_TIFF_PREVIEW_SIZE);
+      }
+      // Process the `External Metadata Directory`
+      MapColumnDefn columnDefn = new MapColumnDefn(dictName).builder(loader);
+      ImageDirectoryProcessor.processDirectory(columnDefn, directory, metadata, config);
+      // Continue to process XmpDirectory if exists
+      if (directory instanceof XmpDirectory) {
+        ImageDirectoryProcessor.processXmpDirectory(columnDefn, (XmpDirectory) directory);
+      }
+    }
+  }
+
+  protected abstract static class ColumnDefn {
+
+    private final String name;
+    private final String originName; // not format
+    private ScalarWriter writer;
+
+    public ColumnDefn(String name) {
+      this.originName = name;
+      this.name = ImageMetadataUtils.formatName(name);
+    }
+
+    public String getName() {
+      return name;
+    }
+
+    public String getOriginName() {
+      return originName;
+    }
+
+    public ScalarWriter getWriter() {
+      return writer;
+    }
+
+    public void bind(RowSetLoader loader) {
+      writer = loader.scalar(name);
+    }
+
+    public void defineText(SchemaBuilder builder) {
+      builder.add(getName(), Types.optional(MinorType.VARCHAR));
+    }
+
+    public abstract void define(SchemaBuilder builder);
+
+    public abstract void load(Object value);
+
+    public ScalarWriter addText(String name) {
+      throw new UnsupportedOperationException();
+    }
+
+    public ArrayWriter addList(String name) {
+      throw new UnsupportedOperationException();
+    }
+
+    public ArrayWriter addListMap(String name) {
+      throw new UnsupportedOperationException();
+    }
+
+    public TupleWriter addMap(String name) {
+      throw new UnsupportedOperationException();
+    }
+  }
+
+  protected static class GenericColumnDefn extends ColumnDefn {

Review comment:
       Can you please add some Javadocs for these holder classes?

##########
File path: exec/java-exec/src/main/java/org/apache/drill/exec/store/image/ImageBatchReader.java
##########
@@ -0,0 +1,461 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.image;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.LinkedHashMap;
+
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.store.dfs.easy.EasySubScan;
+import org.apache.drill.exec.vector.accessor.ArrayWriter;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.exec.vector.accessor.TupleWriter;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.drew.imaging.FileType;
+import com.drew.imaging.FileTypeDetector;
+import com.drew.imaging.ImageMetadataReader;
+import com.drew.imaging.ImageProcessingException;
+import com.drew.metadata.Directory;
+import com.drew.metadata.Metadata;
+import com.drew.metadata.eps.EpsDirectory;
+import com.drew.metadata.exif.ExifIFD0Directory;
+import com.drew.metadata.xmp.XmpDirectory;
+
+public class ImageBatchReader implements ManagedReader<FileSchemaNegotiator> {
+
+  private static final Logger logger = LoggerFactory.getLogger(ImageBatchReader.class);
+
+  private final ImageFormatConfig config;
+  private final EasySubScan scan;
+  private CustomErrorContext errorContext;
+  private Path path;
+  private FileStatus fileStatus;
+  private BufferedInputStream metaInputStream;
+  private RowSetLoader loader;
+  private LinkedHashMap<String, ColumnDefn> genericColumns;
+  private Metadata metadata;
+
+  public ImageBatchReader(final ImageFormatConfig config, final EasySubScan scan) {
+    this.config = config;
+    this.scan = scan;
+  }
+
+  @Override
+  public boolean open(FileSchemaNegotiator negotiator) {
+    try {
+      errorContext = negotiator.parentErrorContext();
+      DrillFileSystem dfs = negotiator.fileSystem();
+      path = dfs.makeQualified(negotiator.split().getPath());
+      fileStatus = dfs.getFileStatus(path);
+      metaInputStream = new BufferedInputStream(dfs.openPossiblyCompressedStream(path));
+      logger.debug("The config is {}, root is {}, columns has {}", config, scan.getSelectionRoot(), scan.getColumns());
+    } catch (IOException e) {
+      throw UserException
+              .dataReadError(e)
+              .message("Failure in initial image inputstream. " + e.getMessage())
+              .addContext(errorContext)
+              .build(logger);
+    }
+    // define the schema
+    negotiator.tableSchema(defineMetadata(), false);
+    ResultSetLoader resultSetLoader = negotiator.build();
+    loader = resultSetLoader.writer();
+    // bind the writer for generic columns
+    bindColumns(loader);
+    return true;
+  }
+
+  @Override
+  public boolean next() {
+    try {
+      loader.start();
+      // process generic metadata
+      processGenericMetadata();
+      // process external metadata
+      processExtenalMetadata();
+      loader.save();
+    } catch (IOException e) {
+      throw UserException
+              .dataReadError(e)
+              .message("Failed to estimates the file type. " + e.getMessage())
+              .addContext(errorContext)
+              .build(logger);
+    } catch (ImageProcessingException e) {
+      throw UserException
+              .dataReadError(e)
+              .message("Error in reading metadata from inputstream. " + e.getMessage())
+              .addContext(errorContext)
+              .build(logger);
+    } catch (Exception e) {
+      throw UserException
+              .dataReadError(e)
+              .message("Error in processing metadata directory. " + e.getMessage())
+              .addContext(errorContext)
+              .build(logger);
+    }
+    return false;
+  }
+
+  @Override
+  public void close() {
+    AutoCloseables.closeSilently(metaInputStream);
+  }
+
+  private TupleMetadata defineMetadata() {
+    SchemaBuilder builder = new SchemaBuilder();
+    genericColumns = new LinkedHashMap<>();
+    Collection<String> tags = GenericMetadataDirectory._tagNameMap.values();
+    for (String tagName : tags) {
+      if (!config.hasFileSystemMetadata() && ImageMetadataUtils.isSkipTag(tagName)) {
+        continue;
+      }
+      ColumnDefn columnDefn = new GenericColumnDefn(tagName);
+      if (config.isDescriptive()) {
+        columnDefn.defineText(builder);
+      } else {
+        columnDefn.define(builder);
+      }
+      genericColumns.put(ImageMetadataUtils.formatName(tagName), columnDefn);
+    }
+    Collections.unmodifiableMap(genericColumns);

Review comment:
       Do we need this?  The result is not assigned to anything.

##########
File path: exec/java-exec/src/main/java/org/apache/drill/exec/store/image/ImageFormatConfig.java
##########
@@ -15,59 +15,58 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.drill.exec.store.image;
 
 import java.util.List;
 import java.util.Objects;
 
 import org.apache.drill.common.PlanStringBuilder;
 import org.apache.drill.common.logical.FormatPluginConfig;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonInclude.Include;
 import com.fasterxml.jackson.annotation.JsonTypeName;
-import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
 
-@JsonTypeName("image") @JsonInclude(Include.NON_DEFAULT)
+@JsonTypeName(ImageFormatConfig.NAME)

Review comment:
       When we move this to `contrib` , can we please add a `README.md` file to explain the config options?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [drill] luocooong commented on pull request #2141: DRILL-7536: Convert Image format to EVF

Posted by GitBox <gi...@apache.org>.
luocooong commented on pull request #2141:
URL: https://github.com/apache/drill/pull/2141#issuecomment-760285205


   @cgivre Thank you for review the PR. Would you please take a look again? I will move to `contrib` folder once all done.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [drill] cgivre commented on pull request #2141: DRILL-7536: Convert Image format to EVF

Posted by GitBox <gi...@apache.org>.
cgivre commented on pull request #2141:
URL: https://github.com/apache/drill/pull/2141#issuecomment-757386649


   One more thing... can we move this to the `contrib` folder and include a `README`?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org