You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2018/11/06 05:33:45 UTC

[GitHub] jon-wei closed pull request #6360: overhaul 'druid-parquet-extensions' module, promoting from 'contrib' to 'core'

jon-wei closed pull request #6360: overhaul 'druid-parquet-extensions' module, promoting from 'contrib' to 'core'
URL: https://github.com/apache/incubator-druid/pull/6360
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/java/org/apache/druid/data/input/impl/JSONParseSpec.java b/core/src/main/java/org/apache/druid/data/input/impl/JSONParseSpec.java
index d72c02e688e..bf33d5136ec 100644
--- a/core/src/main/java/org/apache/druid/data/input/impl/JSONParseSpec.java
+++ b/core/src/main/java/org/apache/druid/data/input/impl/JSONParseSpec.java
@@ -34,10 +34,9 @@
 
 /**
  */
-public class JSONParseSpec extends ParseSpec
+public class JSONParseSpec extends NestedDataParseSpec<JSONPathSpec>
 {
   private final ObjectMapper objectMapper;
-  private final JSONPathSpec flattenSpec;
   private final Map<String, Boolean> featureSpec;
 
   @JsonCreator
@@ -48,10 +47,9 @@ public JSONParseSpec(
       @JsonProperty("featureSpec") Map<String, Boolean> featureSpec
   )
   {
-    super(timestampSpec, dimensionsSpec);
+    super(timestampSpec, dimensionsSpec, flattenSpec != null ? flattenSpec : JSONPathSpec.DEFAULT);
     this.objectMapper = new ObjectMapper();
-    this.flattenSpec = flattenSpec != null ? flattenSpec : JSONPathSpec.DEFAULT;
-    this.featureSpec = (featureSpec == null) ? new HashMap<String, Boolean>() : featureSpec;
+    this.featureSpec = (featureSpec == null) ? new HashMap<>() : featureSpec;
     for (Map.Entry<String, Boolean> entry : this.featureSpec.entrySet()) {
       Feature feature = Feature.valueOf(entry.getKey());
       objectMapper.configure(feature, entry.getValue());
@@ -72,7 +70,7 @@ public void verify(List<String> usedCols)
   @Override
   public Parser<String, Object> makeParser()
   {
-    return new JSONPathParser(flattenSpec, objectMapper);
+    return new JSONPathParser(getFlattenSpec(), objectMapper);
   }
 
   @Override
@@ -87,12 +85,6 @@ public ParseSpec withDimensionsSpec(DimensionsSpec spec)
     return new JSONParseSpec(getTimestampSpec(), spec, getFlattenSpec(), getFeatureSpec());
   }
 
-  @JsonProperty
-  public JSONPathSpec getFlattenSpec()
-  {
-    return flattenSpec;
-  }
-
   @JsonProperty
   public Map<String, Boolean> getFeatureSpec()
   {
@@ -112,14 +104,13 @@ public boolean equals(final Object o)
       return false;
     }
     final JSONParseSpec that = (JSONParseSpec) o;
-    return Objects.equals(flattenSpec, that.flattenSpec) &&
-           Objects.equals(featureSpec, that.featureSpec);
+    return Objects.equals(featureSpec, that.featureSpec);
   }
 
   @Override
   public int hashCode()
   {
-    return Objects.hash(super.hashCode(), flattenSpec, featureSpec);
+    return Objects.hash(super.hashCode(), featureSpec);
   }
 
   @Override
@@ -128,7 +119,7 @@ public String toString()
     return "JSONParseSpec{" +
            "timestampSpec=" + getTimestampSpec() +
            ", dimensionsSpec=" + getDimensionsSpec() +
-           ", flattenSpec=" + flattenSpec +
+           ", flattenSpec=" + getFlattenSpec() +
            ", featureSpec=" + featureSpec +
            '}';
   }
diff --git a/core/src/main/java/org/apache/druid/data/input/impl/NestedDataParseSpec.java b/core/src/main/java/org/apache/druid/data/input/impl/NestedDataParseSpec.java
new file mode 100644
index 00000000000..f8d89ee980e
--- /dev/null
+++ b/core/src/main/java/org/apache/druid/data/input/impl/NestedDataParseSpec.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.impl;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Objects;
+
+public abstract class NestedDataParseSpec<TFlattenSpec> extends ParseSpec
+{
+  private final TFlattenSpec flattenSpec;
+
+  protected NestedDataParseSpec(
+      @JsonProperty("timestampSpec") TimestampSpec timestampSpec,
+      @JsonProperty("dimensionsSpec") DimensionsSpec dimensionsSpec,
+      @JsonProperty("flattenSpec") TFlattenSpec flattenSpec
+  )
+  {
+    super(timestampSpec, dimensionsSpec);
+    this.flattenSpec = flattenSpec;
+  }
+
+  @JsonProperty
+  public TFlattenSpec getFlattenSpec()
+  {
+    return flattenSpec;
+  }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    if (!super.equals(o)) {
+      return false;
+    }
+    NestedDataParseSpec that = (NestedDataParseSpec) o;
+    return Objects.equals(flattenSpec, that.flattenSpec);
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(super.hashCode(), flattenSpec);
+  }
+}
diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/GenericAvroMappingProvider.java b/core/src/main/java/org/apache/druid/java/util/common/parsers/NotImplementedMappingProvider.java
similarity index 73%
rename from extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/GenericAvroMappingProvider.java
rename to core/src/main/java/org/apache/druid/java/util/common/parsers/NotImplementedMappingProvider.java
index a1d50c2d3a9..448ae67607e 100644
--- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/GenericAvroMappingProvider.java
+++ b/core/src/main/java/org/apache/druid/java/util/common/parsers/NotImplementedMappingProvider.java
@@ -17,28 +17,23 @@
  * under the License.
  */
 
-package org.apache.druid.data.input.avro;
+package org.apache.druid.java.util.common.parsers;
 
 import com.jayway.jsonpath.Configuration;
 import com.jayway.jsonpath.TypeRef;
 import com.jayway.jsonpath.spi.mapper.MappingProvider;
 
-/**
- * MappingProvider for JsonPath + Avro.
- */
-public class GenericAvroMappingProvider implements MappingProvider
+public class NotImplementedMappingProvider implements MappingProvider
 {
   @Override
-  public <T> T map(final Object o, final Class<T> aClass, final Configuration configuration)
+  public <T> T map(Object source, Class<T> targetType, Configuration configuration)
   {
-    // Not used by us.
     throw new UnsupportedOperationException();
   }
 
   @Override
-  public <T> T map(final Object o, final TypeRef<T> typeRef, final Configuration configuration)
+  public <T> T map(Object source, TypeRef<T> targetType, Configuration configuration)
   {
-    // Not used by us.
     throw new UnsupportedOperationException();
   }
 }
diff --git a/distribution/pom.xml b/distribution/pom.xml
index c7706cc5da8..88c9f8bff94 100644
--- a/distribution/pom.xml
+++ b/distribution/pom.xml
@@ -147,6 +147,8 @@
                                         <argument>-c</argument>
                                         <argument>org.apache.druid.extensions:mysql-metadata-storage</argument>
                                         <argument>-c</argument>
+                                        <argument>org.apache.druid.extensions:druid-parquet-extensions</argument>
+                                        <argument>-c</argument>
                                         <argument>org.apache.druid.extensions:postgresql-metadata-storage</argument>
                                         <argument>-c</argument>
                                         <argument>org.apache.druid.extensions:druid-kerberos</argument>
@@ -272,8 +274,6 @@
                                         <argument>-c</argument>
                                         <argument>org.apache.druid.extensions.contrib:druid-orc-extensions</argument>
                                         <argument>-c</argument>
-                                        <argument>org.apache.druid.extensions.contrib:druid-parquet-extensions</argument>
-                                        <argument>-c</argument>
                                         <argument>org.apache.druid.extensions.contrib:druid-rabbitmq</argument>
                                         <argument>-c</argument>
                                         <argument>org.apache.druid.extensions.contrib:druid-redis-cache</argument>
diff --git a/docs/content/development/extensions-contrib/parquet.md b/docs/content/development/extensions-contrib/parquet.md
deleted file mode 100644
index 428ce526d08..00000000000
--- a/docs/content/development/extensions-contrib/parquet.md
+++ /dev/null
@@ -1,158 +0,0 @@
----
-layout: doc_page
----
-
-# Ingestion using Parquet format
-
-To use this extension, make sure to [include](../../operations/including-extensions.html) both `druid-avro-extensions` and `druid-parquet-extensions`.
-
-This extension enables Druid to ingest and understand the Apache Parquet data format offline.
-
-## Parquet Hadoop Parser
-
-This is for batch ingestion using the HadoopDruidIndexer. The inputFormat of `inputSpec` in `ioConfig` must be set to `"org.apache.druid.data.input.parquet.DruidParquetInputFormat"`.
-
-|Field     | Type        | Description                                                                            | Required|
-|----------|-------------|----------------------------------------------------------------------------------------|---------|
-| type      | String      | This should say `parquet`                                                              | yes |
-| parseSpec | JSON Object | Specifies the timestamp and dimensions of the data. Should be a timeAndDims parseSpec. | yes |
-| binaryAsString | Boolean | Specifies if the bytes parquet column should be converted to strings. | no(default == false) |
-
-When the time dimension is a [DateType column](https://github.com/apache/parquet-format/blob/master/LogicalTypes.md), a format should not be supplied. When the format is UTF8 (String), either `auto` or a explicitly defined [format](http://www.joda.org/joda-time/apidocs/org/joda/time/format/DateTimeFormat.html) is required.
-
-### Example json for overlord
-
-When posting the index job to the overlord, setting the correct `inputFormat` is required to switch to parquet ingestion. Make sure to set `jobProperties` to make hdfs path timezone unrelated:
-
-```json
-{
-  "type": "index_hadoop",
-  "spec": {
-    "ioConfig": {
-      "type": "hadoop",
-      "inputSpec": {
-        "type": "static",
-        "inputFormat": "org.apache.druid.data.input.parquet.DruidParquetInputFormat",
-        "paths": "no_metrics"
-      }
-    },
-    "dataSchema": {
-      "dataSource": "no_metrics",
-      "parser": {
-        "type": "parquet",
-        "parseSpec": {
-          "format": "timeAndDims",
-          "timestampSpec": {
-            "column": "time",
-            "format": "auto"
-          },
-          "dimensionsSpec": {
-            "dimensions": [
-              "name"
-            ],
-            "dimensionExclusions": [],
-            "spatialDimensions": []
-          }
-        }
-      },
-      "metricsSpec": [{
-        "type": "count",
-        "name": "count"
-      }],
-      "granularitySpec": {
-        "type": "uniform",
-        "segmentGranularity": "DAY",
-        "queryGranularity": "ALL",
-        "intervals": ["2015-12-31/2016-01-02"]
-      }
-    },
-    "tuningConfig": {
-      "type": "hadoop",
-      "partitionsSpec": {
-        "targetPartitionSize": 5000000
-      },
-      "jobProperties" : {},
-      "leaveIntermediate": true
-    }
-  }
-}
-```
-
-### Example json for standalone jvm
-When using a standalone JVM instead, additional configuration fields are required. You can just fire a hadoop job with your local compiled jars like:
-
-```bash
-HADOOP_CLASS_PATH=`hadoop classpath | sed s/*.jar/*/g`
-
-java -Xmx32m -Duser.timezone=UTC -Dfile.encoding=UTF-8 \
-  -classpath config/overlord:config/_common:lib/*:$HADOOP_CLASS_PATH:extensions/druid-avro-extensions/*  \
-  org.apache.druid.cli.Main index hadoop \
-  wikipedia_hadoop_parquet_job.json
-```
-
-An example index json when using the standalone JVM:
-
-```json
-{
-  "type": "index_hadoop",
-  "spec": {
-    "ioConfig": {
-      "type": "hadoop",
-      "inputSpec": {
-        "type": "static",
-        "inputFormat": "org.apache.druid.data.input.parquet.DruidParquetInputFormat",
-        "paths": "no_metrics"
-      },
-      "metadataUpdateSpec": {
-        "type": "postgresql",
-        "connectURI": "jdbc:postgresql://localhost/druid",
-        "user" : "druid",
-        "password" : "asdf",
-        "segmentTable": "druid_segments"
-      },
-      "segmentOutputPath": "tmp/segments"
-    },
-    "dataSchema": {
-      "dataSource": "no_metrics",
-      "parser": {
-        "type": "parquet",
-        "parseSpec": {
-          "format": "timeAndDims",
-          "timestampSpec": {
-            "column": "time",
-            "format": "auto"
-          },
-          "dimensionsSpec": {
-            "dimensions": [
-              "name"
-            ],
-            "dimensionExclusions": [],
-            "spatialDimensions": []
-          }
-        }
-      },
-      "metricsSpec": [{
-        "type": "count",
-        "name": "count"
-      }],
-      "granularitySpec": {
-        "type": "uniform",
-        "segmentGranularity": "DAY",
-        "queryGranularity": "ALL",
-        "intervals": ["2015-12-31/2016-01-02"]
-      }
-    },
-    "tuningConfig": {
-      "type": "hadoop",
-      "workingPath": "tmp/working_path",
-      "partitionsSpec": {
-        "targetPartitionSize": 5000000
-      },
-      "jobProperties" : {},
-      "leaveIntermediate": true
-    }
-  }
-}
-```
-
-Almost all the fields listed above are required, including `inputFormat`, `metadataUpdateSpec`(`type`, `connectURI`, `user`, `password`, `segmentTable`).
diff --git a/docs/content/development/extensions-core/parquet.md b/docs/content/development/extensions-core/parquet.md
new file mode 100644
index 00000000000..3bcdbc80806
--- /dev/null
+++ b/docs/content/development/extensions-core/parquet.md
@@ -0,0 +1,200 @@
+---
+layout: doc_page
+---
+
+# Druid Parquet Extension
+
+This module extends [Druid Hadoop based indexing](../../ingestion/hadoop.html) to ingest data directly from offline 
+Apache Parquet files. 
+
+Note: `druid-parquet-extensions` depends on the `druid-avro-extensions` module, so be sure to
+ [include  both](../../operations/including-extensions.html).
+
+## Parquet Hadoop Parser
+
+This extension provides two ways to parse Parquet files:
+* `parquet` - using a simple conversion contained within this extension 
+* `parquet-avro` - conversion to avro records with the `parquet-avro` library and using the `druid-avro-extensions`
+ module to parse the avro data
+
+Selection of conversion method is controlled by parser type, and the correct hadoop input format must also be set in 
+the `ioConfig`,  `org.apache.druid.data.input.parquet.simple.DruidParquetInputFormat` for `parquet` and 
+`org.apache.druid.data.input.parquet.avro.DruidParquetAvroInputFormat` for `parquet-avro`.
+ 
+
+Both parse options support auto field discovery and flattening if provided with a 
+[flattenSpec](../../ingestion/flatten-json.html) with `parquet` or `avro` as the `format`. Parquet nested list and map 
+[logical types](https://github.com/apache/parquet-format/blob/master/LogicalTypes.md) _should_ operate correctly with 
+json path expressions for all supported types. `parquet-avro` sets a hadoop job property 
+`parquet.avro.add-list-element-records` to `false` (which normally defaults to `true`), in order to 'unwrap' primitive 
+list elements into multi-value dimensions.
+
+The `parquet` parser supports `int96` Parquet values, while `parquet-avro` does not. There may also be some subtle 
+differences in the behavior of json path expression evaluation of `flattenSpec`.
+
+We suggest using `parquet` over `parquet-avro` to allow ingesting data beyond the schema constraints of Avro conversion. 
+However, `parquet-avro` was the original basis for this extension, and as such it is a bit more mature.
+
+
+|Field     | Type        | Description                                                                            | Required|
+|----------|-------------|----------------------------------------------------------------------------------------|---------|
+| type      | String      | Choose `parquet` or `parquet-avro` to determine how Parquet files are parsed | yes |
+| parseSpec | JSON Object | Specifies the timestamp and dimensions of the data, and optionally, a flatten spec. Valid parseSpec formats are `timeAndDims`, `parquet`, `avro` (if used with avro conversion). | yes |
+| binaryAsString | Boolean | Specifies if the bytes parquet column which is not logically marked as a string or enum type should be converted to strings anyway. | no(default == false) |
+
+When the time dimension is a [DateType column](https://github.com/apache/parquet-format/blob/master/LogicalTypes.md), a format should not be supplied. When the format is UTF8 (String), either `auto` or a explicitly defined [format](http://www.joda.org/joda-time/apidocs/org/joda/time/format/DateTimeFormat.html) is required.
+
+### Examples
+
+#### `parquet` parser, `parquet` parseSpec
+```json
+{
+  "type": "index_hadoop",
+  "spec": {
+    "ioConfig": {
+      "type": "hadoop",
+      "inputSpec": {
+        "type": "static",
+        "inputFormat": "org.apache.druid.data.input.parquet.simple.DruidParquetInputFormat",
+        "paths": "path/to/file.parquet"
+      },
+      ...
+    },
+    "dataSchema": {
+      "dataSource": "example",
+      "parser": {
+        "type": "parquet",
+        "parseSpec": {
+          "format": "parquet",
+          "flattenSpec": {
+            "useFieldDiscovery": true,
+            "fields": [
+              {
+                "type": "path",
+                "name": "nestedDim",
+                "expr": "$.nestedData.dim1"
+              },
+              {
+                "type": "path",
+                "name": "listDimFirstItem",
+                "expr": "$.listDim[1]"
+              }
+            ]
+          },
+          "timestampSpec": {
+            "column": "timestamp",
+            "format": "auto"
+          },
+          "dimensionsSpec": {
+            "dimensions": [],
+            "dimensionExclusions": [],
+            "spatialDimensions": []
+          }
+        }
+      },
+      ...
+    },
+    "tuningConfig": <hadoop-tuning-config>
+    }
+  }
+}
+```
+
+#### `parquet` parser, `timeAndDims` parseSpec
+```json
+{
+  "type": "index_hadoop",
+  "spec": {
+    "ioConfig": {
+      "type": "hadoop",
+      "inputSpec": {
+        "type": "static",
+        "inputFormat": "org.apache.druid.data.input.parquet.simple.DruidParquetInputFormat",
+        "paths": "path/to/file.parquet"
+      },
+      ...
+    },
+    "dataSchema": {
+      "dataSource": "example",
+      "parser": {
+        "type": "parquet",
+        "parseSpec": {
+          "format": "timeAndDims",
+          "timestampSpec": {
+            "column": "timestamp",
+            "format": "auto"
+          },
+          "dimensionsSpec": {
+            "dimensions": [
+              "dim1",
+              "dim2",
+              "dim3",
+              "listDim"
+            ],
+            "dimensionExclusions": [],
+            "spatialDimensions": []
+          }
+        }
+      },
+      ...
+    },
+    "tuningConfig": <hadoop-tuning-config>
+  }
+}
+
+```
+#### `parquet-avro` parser, `avro` parseSpec
+```json
+{
+  "type": "index_hadoop",
+  "spec": {
+    "ioConfig": {
+      "type": "hadoop",
+      "inputSpec": {
+        "type": "static",
+        "inputFormat": "org.apache.druid.data.input.parquet.avro.DruidParquetAvroInputFormat",
+        "paths": "path/to/file.parquet"
+      },
+      ...
+    },
+    "dataSchema": {
+      "dataSource": "example",
+      "parser": {
+        "type": "parquet-avro",
+        "parseSpec": {
+          "format": "avro",
+          "flattenSpec": {
+            "useFieldDiscovery": true,
+            "fields": [
+              {
+                "type": "path",
+                "name": "nestedDim",
+                "expr": "$.nestedData.dim1"
+              },
+              {
+                "type": "path",
+                "name": "listDimFirstItem",
+                "expr": "$.listDim[1]"
+              }
+            ]
+          },
+          "timestampSpec": {
+            "column": "timestamp",
+            "format": "auto"
+          },
+          "dimensionsSpec": {
+            "dimensions": [],
+            "dimensionExclusions": [],
+            "spatialDimensions": []
+          }
+        }
+      },
+      ...
+    },
+    "tuningConfig": <hadoop-tuning-config>
+    }
+  }
+}
+```
+
+For additional details see [hadoop ingestion](../../ingestion/hadoop.html) and [general ingestion spec](../../ingestion/ingestion-spec.html) documentation.
\ No newline at end of file
diff --git a/docs/content/development/extensions.md b/docs/content/development/extensions.md
index 7a1e5dd70a4..7b07f4077b8 100644
--- a/docs/content/development/extensions.md
+++ b/docs/content/development/extensions.md
@@ -34,6 +34,7 @@ Core extensions are maintained by Druid committers.
 |druid-kerberos|Kerberos authentication for druid nodes.|[link](../development/extensions-core/druid-kerberos.html)|
 |druid-lookups-cached-global|A module for [lookups](../querying/lookups.html) providing a jvm-global eager caching for lookups. It provides JDBC and URI implementations for fetching lookup data.|[link](../development/extensions-core/lookups-cached-global.html)|
 |druid-lookups-cached-single| Per lookup caching module to support the use cases where a lookup need to be isolated from the global pool of lookups |[link](../development/extensions-core/druid-lookups.html)|
+|druid-parquet-extensions|Support for data in Apache Parquet data format. Requires druid-avro-extensions to be loaded.|[link](../development/extensions-core/parquet.html)|
 |druid-protobuf-extensions| Support for data in Protobuf data format.|[link](../development/extensions-core/protobuf.html)|
 |druid-s3-extensions|Interfacing with data in AWS S3, and using S3 as deep storage.|[link](../development/extensions-core/s3.html)|
 |druid-stats|Statistics related module including variance and standard deviation.|[link](../development/extensions-core/stats.html)|
@@ -61,7 +62,6 @@ All of these community extensions can be downloaded using *pull-deps* with the c
 |druid-distinctcount|DistinctCount aggregator|[link](../development/extensions-contrib/distinctcount.html)|
 |druid-kafka-eight-simpleConsumer|Kafka ingest firehose (low level consumer).|[link](../development/extensions-contrib/kafka-simple.html)|
 |druid-orc-extensions|Support for data in Apache Orc data format.|[link](../development/extensions-contrib/orc.html)|
-|druid-parquet-extensions|Support for data in Apache Parquet data format. Requires druid-avro-extensions to be loaded.|[link](../development/extensions-contrib/parquet.html)|
 |druid-rabbitmq|RabbitMQ firehose.|[link](../development/extensions-contrib/rabbitmq.html)|
 |druid-redis-cache|A cache implementation for Druid based on Redis.|[link](../development/extensions-contrib/redis-cache.html)|
 |druid-rocketmq|RocketMQ firehose.|[link](../development/extensions-contrib/rocketmq.html)|
diff --git a/docs/content/operations/other-hadoop.md b/docs/content/operations/other-hadoop.md
index 59556b9d095..f4770ea54f2 100644
--- a/docs/content/operations/other-hadoop.md
+++ b/docs/content/operations/other-hadoop.md
@@ -201,7 +201,7 @@ If sbt is not your choice, you can also use `maven-shade-plugin` to make a fat j
   </dependency>
 
   <dependency>
-      <groupId>org.apache.druid.extensions.contrib</groupId>
+      <groupId>org.apache.druid.extensions</groupId>
       <artifactId>druid-parquet-extensions</artifactId>
       <version>${project.parent.version}</version>
   </dependency>
diff --git a/extensions-contrib/parquet-extensions/pom.xml b/extensions-contrib/parquet-extensions/pom.xml
deleted file mode 100644
index ac9d88634cd..00000000000
--- a/extensions-contrib/parquet-extensions/pom.xml
+++ /dev/null
@@ -1,72 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one
-  ~ or more contributor license agreements.  See the NOTICE file
-  ~ distributed with this work for additional information
-  ~ regarding copyright ownership.  The ASF licenses this file
-  ~ to you under the Apache License, Version 2.0 (the
-  ~ "License"); you may not use this file except in compliance
-  ~ with the License.  You may obtain a copy of the License at
-  ~
-  ~   http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing,
-  ~ software distributed under the License is distributed on an
-  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-  ~ KIND, either express or implied.  See the License for the
-  ~ specific language governing permissions and limitations
-  ~ under the License.
-  -->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0"
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-    <groupId>org.apache.druid.extensions.contrib</groupId>
-    <artifactId>druid-parquet-extensions</artifactId>
-    <name>druid-parquet-extensions</name>
-    <description>druid-parquet-extensions</description>
-
-    <parent>
-        <artifactId>druid</artifactId>
-        <groupId>org.apache.druid</groupId>
-        <version>0.13.0-incubating-SNAPSHOT</version>
-        <relativePath>../../pom.xml</relativePath>
-    </parent>
-    <modelVersion>4.0.0</modelVersion>
-
-    <dependencies>
-        <dependency>
-            <groupId>org.apache.druid.extensions</groupId>
-            <artifactId>druid-avro-extensions</artifactId>
-            <version>${project.parent.version}</version>
-            <scope>provided</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.parquet</groupId>
-            <artifactId>parquet-avro</artifactId>
-            <version>1.10.0</version>
-        </dependency>
-        <dependency>
-            <groupId>org.xerial.snappy</groupId>
-            <artifactId>snappy-java</artifactId>
-            <version>1.1.4</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.druid</groupId>
-            <artifactId>druid-indexing-hadoop</artifactId>
-            <version>${project.parent.version}</version>
-            <scope>provided</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-client</artifactId>
-            <scope>provided</scope>
-        </dependency>
-        <dependency>
-            <groupId>junit</groupId>
-            <artifactId>junit</artifactId>
-            <scope>test</scope>
-        </dependency>
-    </dependencies>
-
-</project>
diff --git a/extensions-contrib/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/GenericRecordAsMap.java b/extensions-contrib/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/GenericRecordAsMap.java
deleted file mode 100644
index b7acc12ebe2..00000000000
--- a/extensions-contrib/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/GenericRecordAsMap.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.druid.data.input.parquet;
-
-import com.google.common.base.Preconditions;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.util.Utf8;
-import org.apache.druid.java.util.common.StringUtils;
-
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Map;
-import java.util.Set;
-
-public class GenericRecordAsMap implements Map<String, Object>
-{
-  private final GenericRecord record;
-  private final boolean binaryAsString;
-
-  public GenericRecordAsMap(
-      GenericRecord record,
-      boolean binaryAsString
-  )
-  {
-    this.record = Preconditions.checkNotNull(record, "record");
-    this.binaryAsString = binaryAsString;
-  }
-
-  @Override
-  public int size()
-  {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public boolean isEmpty()
-  {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public boolean containsKey(Object key)
-  {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public boolean containsValue(Object value)
-  {
-    throw new UnsupportedOperationException();
-  }
-
-  /**
-   * When used in MapBasedRow, field in GenericRecord will be interpret as follows:
-   * <ul>
-   * <li> avro schema type -> druid dimension:</li>
-   * <ul>
-   * <li>null, boolean, int, long, float, double, string, Records, Enums, Maps, Fixed -> String, using String.valueOf</li>
-   * <li>bytes -> Arrays.toString() or new String if binaryAsString is true</li>
-   * <li>Arrays -> List&lt;String&gt;, using Lists.transform(&lt;List&gt;dimValue, TO_STRING_INCLUDING_NULL)</li>
-   * </ul>
-   * <li> avro schema type -> druid metric:</li>
-   * <ul>
-   * <li>null -> 0F/0L</li>
-   * <li>int, long, float, double -> Float/Long, using Number.floatValue()/Number.longValue()</li>
-   * <li>string -> Float/Long, using Float.valueOf()/Long.valueOf()</li>
-   * <li>boolean, bytes, Arrays, Records, Enums, Maps, Fixed -> ParseException</li>
-   * </ul>
-   * </ul>
-   */
-  @Override
-  public Object get(Object key)
-  {
-    Object field = record.get(key.toString());
-    if (field instanceof ByteBuffer) {
-      if (binaryAsString) {
-        return StringUtils.fromUtf8(((ByteBuffer) field).array());
-      } else {
-        return Arrays.toString(((ByteBuffer) field).array());
-      }
-    }
-    if (field instanceof Utf8) {
-      return field.toString();
-    }
-    return field;
-  }
-
-  @Override
-  public Object put(String key, Object value)
-  {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public Object remove(Object key)
-  {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public void putAll(Map<? extends String, ?> m)
-  {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public void clear()
-  {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public Set<String> keySet()
-  {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public Collection<Object> values()
-  {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public Set<Entry<String, Object>> entrySet()
-  {
-    throw new UnsupportedOperationException();
-  }
-}
diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroFlattenerMaker.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroFlattenerMaker.java
index 72d7a221013..facdaa043a7 100644
--- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroFlattenerMaker.java
+++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroFlattenerMaker.java
@@ -28,6 +28,7 @@
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.util.Utf8;
 import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.parsers.NotImplementedMappingProvider;
 import org.apache.druid.java.util.common.parsers.ObjectFlatteners;
 
 import java.nio.ByteBuffer;
@@ -39,23 +40,52 @@
 
 public class AvroFlattenerMaker implements ObjectFlatteners.FlattenerMaker<GenericRecord>
 {
-  private static final Configuration JSONPATH_CONFIGURATION =
+  static final Configuration JSONPATH_CONFIGURATION =
       Configuration.builder()
                    .jsonProvider(new GenericAvroJsonProvider())
-                   .mappingProvider(new GenericAvroMappingProvider())
+                   .mappingProvider(new NotImplementedMappingProvider())
                    .options(EnumSet.of(Option.SUPPRESS_EXCEPTIONS))
                    .build();
 
   private static final EnumSet<Schema.Type> ROOT_TYPES = EnumSet.of(
       Schema.Type.STRING,
       Schema.Type.BYTES,
-      Schema.Type.ARRAY,
       Schema.Type.INT,
       Schema.Type.LONG,
       Schema.Type.FLOAT,
       Schema.Type.DOUBLE
   );
 
+  public static boolean isPrimitive(Schema schema)
+  {
+    return ROOT_TYPES.contains(schema.getType());
+  }
+
+  public static boolean isPrimitiveArray(Schema schema)
+  {
+    return schema.getType().equals(Schema.Type.ARRAY) && isPrimitive(schema.getElementType());
+  }
+
+  public static boolean isOptionalPrimitive(Schema schema)
+  {
+    return schema.getType().equals(Schema.Type.UNION) &&
+           schema.getTypes().size() == 2 &&
+           (
+               (schema.getTypes().get(0).getType().equals(Schema.Type.NULL) &&
+                (isPrimitive(schema.getTypes().get(1)) || isPrimitiveArray(schema.getTypes().get(1)))) ||
+               (schema.getTypes().get(1).getType().equals(Schema.Type.NULL) &&
+                (isPrimitive(schema.getTypes().get(0)) || isPrimitiveArray(schema.getTypes().get(0))))
+           );
+  }
+
+  static boolean isFieldPrimitive(Schema.Field field)
+  {
+    return isPrimitive(field.schema()) ||
+           isPrimitiveArray(field.schema()) ||
+           isOptionalPrimitive(field.schema());
+  }
+
+
   private final boolean fromPigAvroStorage;
   private final boolean binaryAsString;
 
@@ -71,7 +101,7 @@ public AvroFlattenerMaker(final boolean fromPigAvroStorage, final boolean binary
     return obj.getSchema()
               .getFields()
               .stream()
-              .filter(field -> ROOT_TYPES.contains(field.schema().getType()))
+              .filter(AvroFlattenerMaker::isFieldPrimitive)
               .map(Schema.Field::name)
               .collect(Collectors.toSet());
   }
diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroParseSpec.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroParseSpec.java
index 03dd1703d9b..c7912e38dde 100644
--- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroParseSpec.java
+++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroParseSpec.java
@@ -20,22 +20,16 @@
 package org.apache.druid.data.input.avro;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.NestedDataParseSpec;
 import org.apache.druid.data.input.impl.ParseSpec;
 import org.apache.druid.data.input.impl.TimestampSpec;
 import org.apache.druid.java.util.common.parsers.JSONPathSpec;
 import org.apache.druid.java.util.common.parsers.Parser;
 
-import java.util.Objects;
-
-public class AvroParseSpec extends ParseSpec
+public class AvroParseSpec extends NestedDataParseSpec<JSONPathSpec>
 {
-
-  @JsonIgnore
-  private final JSONPathSpec flattenSpec;
-
   @JsonCreator
   public AvroParseSpec(
       @JsonProperty("timestampSpec") TimestampSpec timestampSpec,
@@ -45,16 +39,9 @@ public AvroParseSpec(
   {
     super(
         timestampSpec != null ? timestampSpec : new TimestampSpec(null, null, null),
-        dimensionsSpec != null ? dimensionsSpec : new DimensionsSpec(null, null, null)
+        dimensionsSpec != null ? dimensionsSpec : DimensionsSpec.EMPTY,
+        flattenSpec != null ? flattenSpec : JSONPathSpec.DEFAULT
     );
-
-    this.flattenSpec = flattenSpec != null ? flattenSpec : JSONPathSpec.DEFAULT;
-  }
-
-  @JsonProperty
-  public JSONPathSpec getFlattenSpec()
-  {
-    return flattenSpec;
   }
 
   @Override
@@ -67,34 +54,12 @@ public JSONPathSpec getFlattenSpec()
   @Override
   public ParseSpec withTimestampSpec(TimestampSpec spec)
   {
-    return new AvroParseSpec(spec, getDimensionsSpec(), flattenSpec);
+    return new AvroParseSpec(spec, getDimensionsSpec(), getFlattenSpec());
   }
 
   @Override
   public ParseSpec withDimensionsSpec(DimensionsSpec spec)
   {
-    return new AvroParseSpec(getTimestampSpec(), spec, flattenSpec);
-  }
-
-  @Override
-  public boolean equals(final Object o)
-  {
-    if (this == o) {
-      return true;
-    }
-    if (o == null || getClass() != o.getClass()) {
-      return false;
-    }
-    if (!super.equals(o)) {
-      return false;
-    }
-    final AvroParseSpec that = (AvroParseSpec) o;
-    return Objects.equals(flattenSpec, that.flattenSpec);
-  }
-
-  @Override
-  public int hashCode()
-  {
-    return Objects.hash(super.hashCode(), flattenSpec);
+    return new AvroParseSpec(getTimestampSpec(), spec, getFlattenSpec());
   }
 }
diff --git a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputRowParserTest.java b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputRowParserTest.java
index e20091dc074..015ff142bf8 100644
--- a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputRowParserTest.java
+++ b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputRowParserTest.java
@@ -88,6 +88,7 @@
       "someStringArray",
       "someIntArray",
       "someFloat",
+      "someUnion",
       EVENT_TYPE,
       ID,
       "someBytes",
diff --git a/extensions-contrib/parquet-extensions/example/284a0e001476716b-56d5676f53bd6e85_115466471_data.0.parq b/extensions-core/parquet-extensions/example/compat/284a0e001476716b-56d5676f53bd6e85_115466471_data.0.parq
similarity index 100%
rename from extensions-contrib/parquet-extensions/example/284a0e001476716b-56d5676f53bd6e85_115466471_data.0.parq
rename to extensions-core/parquet-extensions/example/compat/284a0e001476716b-56d5676f53bd6e85_115466471_data.0.parq
diff --git a/extensions-contrib/parquet-extensions/example/impala_hadoop_parquet_job.json b/extensions-core/parquet-extensions/example/compat/impala_hadoop_parquet_job.json
similarity index 78%
rename from extensions-contrib/parquet-extensions/example/impala_hadoop_parquet_job.json
rename to extensions-core/parquet-extensions/example/compat/impala_hadoop_parquet_job.json
index 4e2c5468a27..0826ecbfca1 100644
--- a/extensions-contrib/parquet-extensions/example/impala_hadoop_parquet_job.json
+++ b/extensions-core/parquet-extensions/example/compat/impala_hadoop_parquet_job.json
@@ -5,14 +5,14 @@
       "type": "hadoop",
       "inputSpec": {
         "type": "static",
-        "inputFormat": "org.apache.druid.data.input.parquet.DruidParquetInputFormat",
-        "paths": "example/284a0e001476716b-56d5676f53bd6e85_115466471_data.0.parq"
+        "inputFormat": "%s",
+        "paths": "example/compat/284a0e001476716b-56d5676f53bd6e85_115466471_data.0.parq"
       },
       "metadataUpdateSpec": {
         "type": "postgresql",
         "connectURI": "jdbc:postgresql://localhost/druid",
-        "user" : "druid",
-        "password" : "asdf",
+        "user": "druid",
+        "password": "asdf",
         "segmentTable": "druid_segments"
       },
       "segmentOutputPath": "/tmp/segments"
@@ -20,7 +20,7 @@
     "dataSchema": {
       "dataSource": "impala",
       "parser": {
-        "type": "parquet",
+        "type": "%s",
         "binaryAsString": true,
         "parseSpec": {
           "format": "timeAndDims",
@@ -37,15 +37,19 @@
           }
         }
       },
-      "metricsSpec": [{
-        "type": "count",
-        "name": "count"
-      }],
+      "metricsSpec": [
+        {
+          "type": "count",
+          "name": "count"
+        }
+      ],
       "granularitySpec": {
         "type": "uniform",
         "segmentGranularity": "DAY",
         "queryGranularity": "NONE",
-        "intervals": ["2013-08-30/2013-09-02"]
+        "intervals": [
+          "2013-08-30/2013-09-02"
+        ]
       }
     },
     "tuningConfig": {
@@ -54,7 +58,7 @@
       "partitionsSpec": {
         "targetPartitionSize": 5000000
       },
-      "jobProperties" : {
+      "jobProperties": {
         "mapreduce.map.java.opts": "-server -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps",
         "mapreduce.reduce.java.opts": "-server -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps",
         "mapred.child.java.opts": "-server -XX:+PrintGCDetails -XX:+PrintGCTimeStamps"
diff --git a/extensions-core/parquet-extensions/example/compat/nested-array-struct.parquet b/extensions-core/parquet-extensions/example/compat/nested-array-struct.parquet
new file mode 100644
index 00000000000..41a43fa35d3
Binary files /dev/null and b/extensions-core/parquet-extensions/example/compat/nested-array-struct.parquet differ
diff --git a/extensions-core/parquet-extensions/example/compat/nested_array_struct.json b/extensions-core/parquet-extensions/example/compat/nested_array_struct.json
new file mode 100644
index 00000000000..94f19ace504
--- /dev/null
+++ b/extensions-core/parquet-extensions/example/compat/nested_array_struct.json
@@ -0,0 +1,89 @@
+{
+  "type": "index_hadoop",
+  "spec": {
+    "ioConfig": {
+      "type": "hadoop",
+      "inputSpec": {
+        "type": "static",
+        "inputFormat": "%s",
+        "paths": "example/compat/nested-array-struct.parquet"
+      },
+      "metadataUpdateSpec": {
+        "type": "postgresql",
+        "connectURI": "jdbc:postgresql://localhost/druid",
+        "user": "druid",
+        "password": "asdf",
+        "segmentTable": "druid_segments"
+      },
+      "segmentOutputPath": "/tmp/segments"
+    },
+    "dataSchema": {
+      "dataSource": "compat",
+      "parser": {
+        "type": "%s",
+        "parseSpec": {
+          "format": "%s",
+          "flattenSpec": {
+            "useFieldDiscovery": true,
+            "fields": [
+              {
+                "type": "path",
+                "name": "extracted1",
+                "expr": "$.myComplex[0].id"
+              },
+              {
+                "type": "path",
+                "name": "extracted2",
+                "expr": "$.myComplex[0].repeatedMessage[*].someId"
+              }
+            ]
+          },
+          "timestampSpec": {
+            "column": "timestamp",
+            "format": "auto",
+            "missingValue": "2018-09-01T00:00:00.000Z"
+          },
+          "dimensionsSpec": {
+            "dimensions": [
+              "i32_dec"
+            ],
+            "dimensionExclusions": [],
+            "spatialDimensions": []
+          }
+        }
+      },
+      "metricsSpec": [
+        {
+          "type": "count",
+          "name": "count"
+        },
+        {
+          "type": "doubleSum",
+          "name": "metric1",
+          "fieldName": "metric1"
+        }
+      ],
+      "granularitySpec": {
+        "type": "uniform",
+        "segmentGranularity": "DAY",
+        "queryGranularity": "NONE",
+        "intervals": [
+          "2018-08-30/2020-09-02"
+        ]
+      }
+    },
+    "tuningConfig": {
+      "type": "hadoop",
+      "workingPath": "tmp/working_path",
+      "partitionsSpec": {
+        "targetPartitionSize": 5000000
+      },
+      "jobProperties": {
+        "mapreduce.map.java.opts": "-server -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps",
+        "mapreduce.reduce.java.opts": "-server -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps",
+        "mapred.child.java.opts": "-server -XX:+PrintGCDetails -XX:+PrintGCTimeStamps"
+      },
+      "leaveIntermediate": true
+    }
+  }
+}
diff --git a/extensions-core/parquet-extensions/example/compat/old-repeated-int.parquet b/extensions-core/parquet-extensions/example/compat/old-repeated-int.parquet
new file mode 100644
index 00000000000..520922f73eb
Binary files /dev/null and b/extensions-core/parquet-extensions/example/compat/old-repeated-int.parquet differ
diff --git a/extensions-core/parquet-extensions/example/compat/old_repeated_int.json b/extensions-core/parquet-extensions/example/compat/old_repeated_int.json
new file mode 100644
index 00000000000..335c740802a
--- /dev/null
+++ b/extensions-core/parquet-extensions/example/compat/old_repeated_int.json
@@ -0,0 +1,76 @@
+{
+  "type": "index_hadoop",
+  "spec": {
+    "ioConfig": {
+      "type": "hadoop",
+      "inputSpec": {
+        "type": "static",
+        "inputFormat": "%s",
+        "paths": "example/compat/old-repeated-int.parquet"
+      },
+      "metadataUpdateSpec": {
+        "type": "postgresql",
+        "connectURI": "jdbc:postgresql://localhost/druid",
+        "user": "druid",
+        "password": "asdf",
+        "segmentTable": "druid_segments"
+      },
+      "segmentOutputPath": "/tmp/segments"
+    },
+    "dataSchema": {
+      "dataSource": "test",
+      "parser": {
+        "type": "%s",
+        "parseSpec": {
+          "format": "%s",
+          "flattenSpec": {
+            "useFieldDiscovery": true,
+            "fields": [
+              {
+                "type": "root",
+                "name": "repeatedInt"
+              }
+            ]
+          },
+          "timestampSpec": {
+            "column": "timestamp",
+            "format": "auto",
+            "missingValue": "2018-09-01T00:00:00.000Z"
+          },
+          "dimensionsSpec": {
+            "dimensions": [],
+            "dimensionExclusions": [],
+            "spatialDimensions": []
+          }
+        }
+      },
+      "metricsSpec": [
+        {
+          "type": "count",
+          "name": "count"
+        }
+      ],
+      "granularitySpec": {
+        "type": "uniform",
+        "segmentGranularity": "DAY",
+        "queryGranularity": "NONE",
+        "intervals": [
+          "2018-08-30/2020-09-02"
+        ]
+      }
+    },
+    "tuningConfig": {
+      "type": "hadoop",
+      "workingPath": "tmp/working_path",
+      "partitionsSpec": {
+        "targetPartitionSize": 5000000
+      },
+      "jobProperties": {
+        "mapreduce.map.java.opts": "-server -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps",
+        "mapreduce.reduce.java.opts": "-server -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps",
+        "mapred.child.java.opts": "-server -XX:+PrintGCDetails -XX:+PrintGCTimeStamps"
+      },
+      "leaveIntermediate": true
+    }
+  }
+}
diff --git a/extensions-core/parquet-extensions/example/compat/parquet-1217.parquet b/extensions-core/parquet-extensions/example/compat/parquet-1217.parquet
new file mode 100644
index 00000000000..eb2dc4f7990
Binary files /dev/null and b/extensions-core/parquet-extensions/example/compat/parquet-1217.parquet differ
diff --git a/extensions-core/parquet-extensions/example/compat/parquet-thrift-compat.snappy.parquet b/extensions-core/parquet-extensions/example/compat/parquet-thrift-compat.snappy.parquet
new file mode 100644
index 00000000000..837e4876eea
Binary files /dev/null and b/extensions-core/parquet-extensions/example/compat/parquet-thrift-compat.snappy.parquet differ
diff --git a/extensions-core/parquet-extensions/example/compat/parquet_1217.json b/extensions-core/parquet-extensions/example/compat/parquet_1217.json
new file mode 100644
index 00000000000..cf5f12ef362
--- /dev/null
+++ b/extensions-core/parquet-extensions/example/compat/parquet_1217.json
@@ -0,0 +1,86 @@
+{
+  "type": "index_hadoop",
+  "spec": {
+    "ioConfig": {
+      "type": "hadoop",
+      "inputSpec": {
+        "type": "static",
+        "inputFormat": "%s",
+        "paths": "example/compat/parquet-1217.parquet"
+      },
+      "metadataUpdateSpec": {
+        "type": "postgresql",
+        "connectURI": "jdbc:postgresql://localhost/druid",
+        "user": "druid",
+        "password": "asdf",
+        "segmentTable": "druid_segments"
+      },
+      "segmentOutputPath": "/tmp/segments"
+    },
+    "dataSchema": {
+      "dataSource": "test",
+      "parser": {
+        "type": "%s",
+        "parseSpec": {
+          "format": "%s",
+          "flattenSpec": {
+            "useFieldDiscovery": true,
+            "fields": [
+              {
+                "type": "root",
+                "name": "col"
+              },
+              {
+                "type": "path",
+                "name": "metric1",
+                "expr": "$.col"
+              }
+            ]
+          },
+          "timestampSpec": {
+            "column": "timestamp",
+            "format": "auto",
+            "missingValue": "2018-09-01T00:00:00.000Z"
+          },
+          "dimensionsSpec": {
+            "dimensions": [],
+            "dimensionExclusions": [],
+            "spatialDimensions": []
+          }
+        }
+      },
+      "metricsSpec": [
+        {
+          "type": "count",
+          "name": "count"
+        },
+        {
+          "type": "longSum",
+          "name": "metric1",
+          "fieldName": "metric1"
+        }
+      ],
+      "granularitySpec": {
+        "type": "uniform",
+        "segmentGranularity": "DAY",
+        "queryGranularity": "NONE",
+        "intervals": [
+          "2018-08-30/2020-09-02"
+        ]
+      }
+    },
+    "tuningConfig": {
+      "type": "hadoop",
+      "workingPath": "tmp/working_path",
+      "partitionsSpec": {
+        "targetPartitionSize": 5000000
+      },
+      "jobProperties": {
+        "mapreduce.map.java.opts": "-server -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps",
+        "mapreduce.reduce.java.opts": "-server -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps",
+        "mapred.child.java.opts": "-server -XX:+PrintGCDetails -XX:+PrintGCTimeStamps"
+      },
+      "leaveIntermediate": true
+    }
+  }
+}
diff --git a/extensions-core/parquet-extensions/example/compat/parquet_thrift_compat.json b/extensions-core/parquet-extensions/example/compat/parquet_thrift_compat.json
new file mode 100644
index 00000000000..ef613dacfdd
--- /dev/null
+++ b/extensions-core/parquet-extensions/example/compat/parquet_thrift_compat.json
@@ -0,0 +1,87 @@
+{
+  "type": "index_hadoop",
+  "spec": {
+    "ioConfig": {
+      "type": "hadoop",
+      "inputSpec": {
+        "type": "static",
+        "inputFormat": "%s",
+        "paths": "example/compat/parquet-thrift-compat.snappy.parquet"
+      },
+      "metadataUpdateSpec": {
+        "type": "postgresql",
+        "connectURI": "jdbc:postgresql://localhost/druid",
+        "user": "druid",
+        "password": "asdf",
+        "segmentTable": "druid_segments"
+      },
+      "segmentOutputPath": "/tmp/segments"
+    },
+    "dataSchema": {
+      "dataSource": "test",
+      "parser": {
+        "type": "%s",
+        "parseSpec": {
+          "format": "%s",
+          "flattenSpec": {
+            "useFieldDiscovery": true,
+            "fields": [
+              {
+                "type": "path",
+                "name": "extractByLogicalMap",
+                "expr": "$.intToStringColumn.1"
+              },
+              {
+                "type": "path",
+                "name": "extractByComplexLogicalMap",
+                "expr": "$.complexColumn.1[0].nestedIntsColumn[1]"
+              }
+            ]
+          },
+          "timestampSpec": {
+            "column": "timestamp",
+            "format": "auto",
+            "missingValue": "2018-09-01T00:00:00.000Z"
+          },
+          "dimensionsSpec": {
+            "dimensions": [],
+            "dimensionExclusions": [],
+            "spatialDimensions": []
+          }
+        }
+      },
+      "metricsSpec": [
+        {
+          "type": "count",
+          "name": "count"
+        },
+        {
+          "type": "longSum",
+          "name": "metric1",
+          "fieldName": "metric1"
+        }
+      ],
+      "granularitySpec": {
+        "type": "uniform",
+        "segmentGranularity": "DAY",
+        "queryGranularity": "NONE",
+        "intervals": [
+          "2018-08-30/2020-09-02"
+        ]
+      }
+    },
+    "tuningConfig": {
+      "type": "hadoop",
+      "workingPath": "tmp/working_path",
+      "partitionsSpec": {
+        "targetPartitionSize": 5000000
+      },
+      "jobProperties": {
+        "mapreduce.map.java.opts": "-server -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps",
+        "mapreduce.reduce.java.opts": "-server -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps",
+        "mapred.child.java.opts": "-server -XX:+PrintGCDetails -XX:+PrintGCTimeStamps"
+      },
+      "leaveIntermediate": true
+    }
+  }
+}
diff --git a/extensions-core/parquet-extensions/example/compat/proto-struct-with-array.parquet b/extensions-core/parquet-extensions/example/compat/proto-struct-with-array.parquet
new file mode 100644
index 00000000000..325a8370ad2
Binary files /dev/null and b/extensions-core/parquet-extensions/example/compat/proto-struct-with-array.parquet differ
diff --git a/extensions-core/parquet-extensions/example/compat/proto_struct_with_array.json b/extensions-core/parquet-extensions/example/compat/proto_struct_with_array.json
new file mode 100644
index 00000000000..7909e0ee12c
--- /dev/null
+++ b/extensions-core/parquet-extensions/example/compat/proto_struct_with_array.json
@@ -0,0 +1,87 @@
+{
+  "type": "index_hadoop",
+  "spec": {
+    "ioConfig": {
+      "type": "hadoop",
+      "inputSpec": {
+        "type": "static",
+        "inputFormat": "%s",
+        "paths": "example/compat/proto-struct-with-array.parquet"
+      },
+      "metadataUpdateSpec": {
+        "type": "postgresql",
+        "connectURI": "jdbc:postgresql://localhost/druid",
+        "user": "druid",
+        "password": "asdf",
+        "segmentTable": "druid_segments"
+      },
+      "segmentOutputPath": "/tmp/segments"
+    },
+    "dataSchema": {
+      "dataSource": "flat",
+      "parser": {
+        "type": "%s",
+        "parseSpec": {
+          "format": "%s",
+          "flattenSpec": {
+            "useFieldDiscovery": true,
+            "fields": [
+              {
+                "type": "path",
+                "name": "extractedOptional",
+                "expr": "$.optionalMessage.someId"
+              },
+              {
+                "type": "path",
+                "name": "extractedRequired",
+                "expr": "$.requiredMessage.someId"
+              },
+              {
+                "type": "path",
+                "name": "extractedRepeated",
+                "expr": "$.repeatedMessage[*]"
+              }
+            ]
+          },
+          "timestampSpec": {
+            "column": "timestamp",
+            "format": "auto",
+            "missingValue": "2018-09-01T00:00:00.000Z"
+          },
+          "dimensionsSpec": {
+            "dimensions": [],
+            "dimensionExclusions": [],
+            "spatialDimensions": []
+          }
+        }
+      },
+      "metricsSpec": [
+        {
+          "type": "count",
+          "name": "count"
+        }
+      ],
+      "granularitySpec": {
+        "type": "uniform",
+        "segmentGranularity": "DAY",
+        "queryGranularity": "NONE",
+        "intervals": [
+          "2018-08-30/2020-09-02"
+        ]
+      }
+    },
+    "tuningConfig": {
+      "type": "hadoop",
+      "workingPath": "tmp/working_path",
+      "partitionsSpec": {
+        "targetPartitionSize": 5000000
+      },
+      "jobProperties": {
+        "mapreduce.map.java.opts": "-server -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps",
+        "mapreduce.reduce.java.opts": "-server -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps",
+        "mapred.child.java.opts": "-server -XX:+PrintGCDetails -XX:+PrintGCTimeStamps"
+      },
+      "leaveIntermediate": true
+    }
+  }
+}
diff --git a/extensions-core/parquet-extensions/example/decimals/dec-in-fixed-len.parquet b/extensions-core/parquet-extensions/example/decimals/dec-in-fixed-len.parquet
new file mode 100644
index 00000000000..6ad37d56395
Binary files /dev/null and b/extensions-core/parquet-extensions/example/decimals/dec-in-fixed-len.parquet differ
diff --git a/extensions-core/parquet-extensions/example/decimals/dec-in-i32.parquet b/extensions-core/parquet-extensions/example/decimals/dec-in-i32.parquet
new file mode 100755
index 00000000000..bb5d4af8dd3
Binary files /dev/null and b/extensions-core/parquet-extensions/example/decimals/dec-in-i32.parquet differ
diff --git a/extensions-core/parquet-extensions/example/decimals/dec-in-i64.parquet b/extensions-core/parquet-extensions/example/decimals/dec-in-i64.parquet
new file mode 100755
index 00000000000..e07c4a0ad98
Binary files /dev/null and b/extensions-core/parquet-extensions/example/decimals/dec-in-i64.parquet differ
diff --git a/extensions-core/parquet-extensions/example/decimals/dec_in_fix_len.json b/extensions-core/parquet-extensions/example/decimals/dec_in_fix_len.json
new file mode 100644
index 00000000000..491e29deb90
--- /dev/null
+++ b/extensions-core/parquet-extensions/example/decimals/dec_in_fix_len.json
@@ -0,0 +1,88 @@
+{
+  "type": "index_hadoop",
+  "spec": {
+    "ioConfig": {
+      "type": "hadoop",
+      "inputSpec": {
+        "type": "static",
+        "inputFormat": "%s",
+        "paths": "example/decimals/dec-in-fixed-len.parquet"
+      },
+      "metadataUpdateSpec": {
+        "type": "postgresql",
+        "connectURI": "jdbc:postgresql://localhost/druid",
+        "user": "druid",
+        "password": "asdf",
+        "segmentTable": "druid_segments"
+      },
+      "segmentOutputPath": "/tmp/segments"
+    },
+    "dataSchema": {
+      "dataSource": "decimal",
+      "parser": {
+        "type": "%s",
+        "parseSpec": {
+          "format": "%s",
+          "flattenSpec": {
+            "useFieldDiscovery": true,
+            "fields": [
+              {
+                "type": "root",
+                "name": "fixed_len_dec"
+              },
+              {
+                "type": "path",
+                "name": "metric1",
+                "expr": "$.fixed_len_dec"
+              }
+            ]
+          },
+          "timestampSpec": {
+            "column": "timestamp",
+            "format": "auto",
+            "missingValue": "2018-09-01T00:00:00.000Z"
+          },
+          "dimensionsSpec": {
+            "dimensions": [
+              "fixed_len_dec"
+            ],
+            "dimensionExclusions": [],
+            "spatialDimensions": []
+          }
+        }
+      },
+      "metricsSpec": [
+        {
+          "type": "count",
+          "name": "count"
+        },
+        {
+          "type": "doubleSum",
+          "name": "metric1",
+          "fieldName": "metric1"
+        }
+      ],
+      "granularitySpec": {
+        "type": "uniform",
+        "segmentGranularity": "DAY",
+        "queryGranularity": "NONE",
+        "intervals": [
+          "2018-08-30/2020-09-02"
+        ]
+      }
+    },
+    "tuningConfig": {
+      "type": "hadoop",
+      "workingPath": "tmp/working_path",
+      "partitionsSpec": {
+        "targetPartitionSize": 5000000
+      },
+      "jobProperties": {
+        "mapreduce.map.java.opts": "-server -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps",
+        "mapreduce.reduce.java.opts": "-server -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps",
+        "mapred.child.java.opts": "-server -XX:+PrintGCDetails -XX:+PrintGCTimeStamps"
+      },
+      "leaveIntermediate": true
+    }
+  }
+}
diff --git a/extensions-core/parquet-extensions/example/decimals/dec_in_i32.json b/extensions-core/parquet-extensions/example/decimals/dec_in_i32.json
new file mode 100644
index 00000000000..28d1d452971
--- /dev/null
+++ b/extensions-core/parquet-extensions/example/decimals/dec_in_i32.json
@@ -0,0 +1,88 @@
+{
+  "type": "index_hadoop",
+  "spec": {
+    "ioConfig": {
+      "type": "hadoop",
+      "inputSpec": {
+        "type": "static",
+        "inputFormat": "%s",
+        "paths": "example/decimals/dec-in-i32.parquet"
+      },
+      "metadataUpdateSpec": {
+        "type": "postgresql",
+        "connectURI": "jdbc:postgresql://localhost/druid",
+        "user": "druid",
+        "password": "asdf",
+        "segmentTable": "druid_segments"
+      },
+      "segmentOutputPath": "/tmp/segments"
+    },
+    "dataSchema": {
+      "dataSource": "decimal",
+      "parser": {
+        "type": "%s",
+        "parseSpec": {
+          "format": "%s",
+          "flattenSpec": {
+            "useFieldDiscovery": true,
+            "fields": [
+              {
+                "type": "root",
+                "name": "i32_dec"
+              },
+              {
+                "type": "path",
+                "name": "metric1",
+                "expr": "$.i32_dec"
+              }
+            ]
+          },
+          "timestampSpec": {
+            "column": "timestamp",
+            "format": "auto",
+            "missingValue": "2018-09-01T00:00:00.000Z"
+          },
+          "dimensionsSpec": {
+            "dimensions": [
+              "i32_dec"
+            ],
+            "dimensionExclusions": [],
+            "spatialDimensions": []
+          }
+        }
+      },
+      "metricsSpec": [
+        {
+          "type": "count",
+          "name": "count"
+        },
+        {
+          "type": "doubleSum",
+          "name": "metric1",
+          "fieldName": "metric1"
+        }
+      ],
+      "granularitySpec": {
+        "type": "uniform",
+        "segmentGranularity": "DAY",
+        "queryGranularity": "NONE",
+        "intervals": [
+          "2018-08-30/2020-09-02"
+        ]
+      }
+    },
+    "tuningConfig": {
+      "type": "hadoop",
+      "workingPath": "tmp/working_path",
+      "partitionsSpec": {
+        "targetPartitionSize": 5000000
+      },
+      "jobProperties": {
+        "mapreduce.map.java.opts": "-server -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps",
+        "mapreduce.reduce.java.opts": "-server -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps",
+        "mapred.child.java.opts": "-server -XX:+PrintGCDetails -XX:+PrintGCTimeStamps"
+      },
+      "leaveIntermediate": true
+    }
+  }
+}
diff --git a/extensions-core/parquet-extensions/example/decimals/dec_in_i64.json b/extensions-core/parquet-extensions/example/decimals/dec_in_i64.json
new file mode 100644
index 00000000000..5af1a1dae08
--- /dev/null
+++ b/extensions-core/parquet-extensions/example/decimals/dec_in_i64.json
@@ -0,0 +1,88 @@
+{
+  "type": "index_hadoop",
+  "spec": {
+    "ioConfig": {
+      "type": "hadoop",
+      "inputSpec": {
+        "type": "static",
+        "inputFormat": "%s",
+        "paths": "example/decimals/dec-in-i64.parquet"
+      },
+      "metadataUpdateSpec": {
+        "type": "postgresql",
+        "connectURI": "jdbc:postgresql://localhost/druid",
+        "user": "druid",
+        "password": "asdf",
+        "segmentTable": "druid_segments"
+      },
+      "segmentOutputPath": "/tmp/segments"
+    },
+    "dataSchema": {
+      "dataSource": "decimal",
+      "parser": {
+        "type": "%s",
+        "parseSpec": {
+          "format": "%s",
+          "flattenSpec": {
+            "useFieldDiscovery": true,
+            "fields": [
+              {
+                "type": "root",
+                "name": "i64_dec"
+              },
+              {
+                "type": "path",
+                "name": "metric1",
+                "expr": "$.i64_dec"
+              }
+            ]
+          },
+          "timestampSpec": {
+            "column": "timestamp",
+            "format": "auto",
+            "missingValue": "2018-09-01T00:00:00.000Z"
+          },
+          "dimensionsSpec": {
+            "dimensions": [
+              "i64_dec"
+            ],
+            "dimensionExclusions": [],
+            "spatialDimensions": []
+          }
+        }
+      },
+      "metricsSpec": [
+        {
+          "type": "count",
+          "name": "count"
+        },
+        {
+          "type": "doubleSum",
+          "name": "metric1",
+          "fieldName": "metric1"
+        }
+      ],
+      "granularitySpec": {
+        "type": "uniform",
+        "segmentGranularity": "DAY",
+        "queryGranularity": "NONE",
+        "intervals": [
+          "2018-08-30/2020-09-02"
+        ]
+      }
+    },
+    "tuningConfig": {
+      "type": "hadoop",
+      "workingPath": "tmp/working_path",
+      "partitionsSpec": {
+        "targetPartitionSize": 5000000
+      },
+      "jobProperties": {
+        "mapreduce.map.java.opts": "-server -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps",
+        "mapreduce.reduce.java.opts": "-server -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps",
+        "mapred.child.java.opts": "-server -XX:+PrintGCDetails -XX:+PrintGCTimeStamps"
+      },
+      "leaveIntermediate": true
+    }
+  }
+}
diff --git a/extensions-core/parquet-extensions/example/flattening/flat_1.json b/extensions-core/parquet-extensions/example/flattening/flat_1.json
new file mode 100644
index 00000000000..95d9cc2dfaf
--- /dev/null
+++ b/extensions-core/parquet-extensions/example/flattening/flat_1.json
@@ -0,0 +1,76 @@
+{
+  "type": "index_hadoop",
+  "spec": {
+    "ioConfig": {
+      "type": "hadoop",
+      "inputSpec": {
+        "type": "static",
+        "inputFormat": "%s",
+        "paths": "example/flattening/test_flat_1.parquet"
+      },
+      "metadataUpdateSpec": {
+        "type": "postgresql",
+        "connectURI": "jdbc:postgresql://localhost/druid",
+        "user": "druid",
+        "password": "asdf",
+        "segmentTable": "druid_segments"
+      },
+      "segmentOutputPath": "/tmp/segments"
+    },
+    "dataSchema": {
+      "dataSource": "flat",
+      "parser": {
+        "type": "%s",
+        "parseSpec": {
+          "format": "timeAndDims",
+          "timestampSpec": {
+            "column": "timestamp",
+            "format": "auto"
+          },
+          "dimensionsSpec": {
+            "dimensions": [
+              "dim1",
+              "dim2",
+              "dim3",
+              "listDim"
+            ],
+            "dimensionExclusions": [],
+            "spatialDimensions": []
+          }
+        }
+      },
+      "metricsSpec": [
+        {
+          "type": "count",
+          "name": "count"
+        },
+        {
+          "type": "longSum",
+          "name": "metric1",
+          "fieldName": "metric1"
+        }
+      ],
+      "granularitySpec": {
+        "type": "uniform",
+        "segmentGranularity": "DAY",
+        "queryGranularity": "NONE",
+        "intervals": [
+          "2018-08-30/2020-09-02"
+        ]
+      }
+    },
+    "tuningConfig": {
+      "type": "hadoop",
+      "workingPath": "tmp/working_path",
+      "partitionsSpec": {
+        "targetPartitionSize": 5000000
+      },
+      "jobProperties": {
+        "mapreduce.map.java.opts": "-server -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps",
+        "mapreduce.reduce.java.opts": "-server -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps",
+        "mapred.child.java.opts": "-server -XX:+PrintGCDetails -XX:+PrintGCTimeStamps"
+      },
+      "leaveIntermediate": true
+    }
+  }
+}
diff --git a/extensions-core/parquet-extensions/example/flattening/flat_1_autodiscover_fields.json b/extensions-core/parquet-extensions/example/flattening/flat_1_autodiscover_fields.json
new file mode 100644
index 00000000000..3ded41c2b59
--- /dev/null
+++ b/extensions-core/parquet-extensions/example/flattening/flat_1_autodiscover_fields.json
@@ -0,0 +1,75 @@
+{
+  "type": "index_hadoop",
+  "spec": {
+    "ioConfig": {
+      "type": "hadoop",
+      "inputSpec": {
+        "type": "static",
+        "inputFormat": "%s",
+        "paths": "example/flattening/test_flat_1.parquet"
+      },
+      "metadataUpdateSpec": {
+        "type": "postgresql",
+        "connectURI": "jdbc:postgresql://localhost/druid",
+        "user": "druid",
+        "password": "asdf",
+        "segmentTable": "druid_segments"
+      },
+      "segmentOutputPath": "/tmp/segments"
+    },
+    "dataSchema": {
+      "dataSource": "flat",
+      "parser": {
+        "type": "%s",
+        "parseSpec": {
+          "format": "%s",
+          "flattenSpec": {
+            "useFieldDiscovery": true,
+            "fields": []
+          },
+          "timestampSpec": {
+            "column": "timestamp",
+            "format": "auto"
+          },
+          "dimensionsSpec": {
+            "dimensions": [],
+            "dimensionExclusions": [],
+            "spatialDimensions": []
+          }
+        }
+      },
+      "metricsSpec": [
+        {
+          "type": "count",
+          "name": "count"
+        },
+        {
+          "type": "longSum",
+          "name": "metric1",
+          "fieldName": "metric1"
+        }
+      ],
+      "granularitySpec": {
+        "type": "uniform",
+        "segmentGranularity": "DAY",
+        "queryGranularity": "NONE",
+        "intervals": [
+          "2018-08-30/2020-09-02"
+        ]
+      }
+    },
+    "tuningConfig": {
+      "type": "hadoop",
+      "workingPath": "tmp/working_path",
+      "partitionsSpec": {
+        "targetPartitionSize": 5000000
+      },
+      "jobProperties": {
+        "mapreduce.map.java.opts": "-server -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps",
+        "mapreduce.reduce.java.opts": "-server -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps",
+        "mapred.child.java.opts": "-server -XX:+PrintGCDetails -XX:+PrintGCTimeStamps"
+      },
+      "leaveIntermediate": true
+    }
+  }
+}
diff --git a/extensions-core/parquet-extensions/example/flattening/flat_1_flatten.json b/extensions-core/parquet-extensions/example/flattening/flat_1_flatten.json
new file mode 100644
index 00000000000..75caf256f2a
--- /dev/null
+++ b/extensions-core/parquet-extensions/example/flattening/flat_1_flatten.json
@@ -0,0 +1,97 @@
+{
+  "type": "index_hadoop",
+  "spec": {
+    "ioConfig": {
+      "type": "hadoop",
+      "inputSpec": {
+        "type": "static",
+        "inputFormat": "%s",
+        "paths": "example/flattening/test_flat_1.parquet"
+      },
+      "metadataUpdateSpec": {
+        "type": "postgresql",
+        "connectURI": "jdbc:postgresql://localhost/druid",
+        "user": "druid",
+        "password": "asdf",
+        "segmentTable": "druid_segments"
+      },
+      "segmentOutputPath": "/tmp/segments"
+    },
+    "dataSchema": {
+      "dataSource": "flat",
+      "parser": {
+        "type": "%s",
+        "parseSpec": {
+          "format": "%s",
+          "flattenSpec": {
+            "useFieldDiscovery": false,
+            "fields": [
+              {
+                "type": "root",
+                "name": "timestamp"
+              },
+              {
+                "type": "root",
+                "name": "dim1"
+              },
+              {
+                "type": "root",
+                "name": "dim2"
+              },
+              {
+                "type": "root",
+                "name": "dim3"
+              },
+              {
+                "type": "path",
+                "name": "list",
+                "expr": "$.listDim"
+              }
+            ]
+          },
+          "timestampSpec": {
+            "column": "timestamp",
+            "format": "auto"
+          },
+          "dimensionsSpec": {
+            "dimensions": [],
+            "dimensionExclusions": [],
+            "spatialDimensions": []
+          }
+        }
+      },
+      "metricsSpec": [
+        {
+          "type": "count",
+          "name": "count"
+        },
+        {
+          "type": "longSum",
+          "name": "metric1",
+          "fieldName": "metric1"
+        }
+      ],
+      "granularitySpec": {
+        "type": "uniform",
+        "segmentGranularity": "DAY",
+        "queryGranularity": "NONE",
+        "intervals": [
+          "2018-08-30/2020-09-02"
+        ]
+      }
+    },
+    "tuningConfig": {
+      "type": "hadoop",
+      "workingPath": "tmp/working_path",
+      "partitionsSpec": {
+        "targetPartitionSize": 5000000
+      },
+      "jobProperties": {
+        "mapreduce.map.java.opts": "-server -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps",
+        "mapreduce.reduce.java.opts": "-server -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps",
+        "mapred.child.java.opts": "-server -XX:+PrintGCDetails -XX:+PrintGCTimeStamps"
+      },
+      "leaveIntermediate": true
+    }
+  }
+}
diff --git a/extensions-core/parquet-extensions/example/flattening/flat_1_list_index.json b/extensions-core/parquet-extensions/example/flattening/flat_1_list_index.json
new file mode 100644
index 00000000000..219afde8892
--- /dev/null
+++ b/extensions-core/parquet-extensions/example/flattening/flat_1_list_index.json
@@ -0,0 +1,93 @@
+{
+  "type": "index_hadoop",
+  "spec": {
+    "ioConfig": {
+      "type": "hadoop",
+      "inputSpec": {
+        "type": "static",
+        "inputFormat": "%s",
+        "paths": "example/flattening/test_flat_1.parquet"
+      },
+      "metadataUpdateSpec": {
+        "type": "postgresql",
+        "connectURI": "jdbc:postgresql://localhost/druid",
+        "user": "druid",
+        "password": "asdf",
+        "segmentTable": "druid_segments"
+      },
+      "segmentOutputPath": "/tmp/segments"
+    },
+    "dataSchema": {
+      "dataSource": "flat",
+      "parser": {
+        "type": "%s",
+        "parseSpec": {
+          "format": "%s",
+          "flattenSpec": {
+            "useFieldDiscovery": false,
+            "fields": [
+              {
+                "type": "root",
+                "name": "timestamp"
+              },
+              {
+                "type": "root",
+                "name": "dim1"
+              },
+              {
+                "type": "root",
+                "name": "dim2"
+              },
+              {
+                "type": "path",
+                "name": "listextracted",
+                "expr": "$.listDim[1]"
+              }
+            ]
+          },
+          "timestampSpec": {
+            "column": "timestamp",
+            "format": "auto"
+          },
+          "dimensionsSpec": {
+            "dimensions": [],
+            "dimensionExclusions": [],
+            "spatialDimensions": []
+          }
+        }
+      },
+      "metricsSpec": [
+        {
+          "type": "count",
+          "name": "count"
+        },
+        {
+          "type": "longSum",
+          "name": "metric1",
+          "fieldName": "metric1"
+        }
+      ],
+      "granularitySpec": {
+        "type": "uniform",
+        "segmentGranularity": "DAY",
+        "queryGranularity": "NONE",
+        "intervals": [
+          "2018-08-30/2020-09-02"
+        ]
+      }
+    },
+    "tuningConfig": {
+      "type": "hadoop",
+      "workingPath": "tmp/working_path",
+      "partitionsSpec": {
+        "targetPartitionSize": 5000000
+      },
+      "jobProperties": {
+        "mapreduce.map.java.opts": "-server -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps",
+        "mapreduce.reduce.java.opts": "-server -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps",
+        "mapred.child.java.opts": "-server -XX:+PrintGCDetails -XX:+PrintGCTimeStamps"
+      },
+      "leaveIntermediate": true
+    }
+  }
+}
diff --git a/extensions-core/parquet-extensions/example/flattening/nested_1.json b/extensions-core/parquet-extensions/example/flattening/nested_1.json
new file mode 100644
index 00000000000..752605cb56d
--- /dev/null
+++ b/extensions-core/parquet-extensions/example/flattening/nested_1.json
@@ -0,0 +1,73 @@
+{
+  "type": "index_hadoop",
+  "spec": {
+    "ioConfig": {
+      "type": "hadoop",
+      "inputSpec": {
+        "type": "static",
+        "inputFormat": "%s",
+        "paths": "example/flattening/test_nested_1.parquet"
+      },
+      "metadataUpdateSpec": {
+        "type": "postgresql",
+        "connectURI": "jdbc:postgresql://localhost/druid",
+        "user": "druid",
+        "password": "asdf",
+        "segmentTable": "druid_segments"
+      },
+      "segmentOutputPath": "/tmp/segments"
+    },
+    "dataSchema": {
+      "dataSource": "test1",
+      "parser": {
+        "type": "%s",
+        "parseSpec": {
+          "format": "timeAndDims",
+          "timestampSpec": {
+            "column": "timestamp",
+            "format": "auto"
+          },
+          "dimensionsSpec": {
+            "dimensions": [
+              "dim1"
+            ],
+            "dimensionExclusions": [],
+            "spatialDimensions": []
+          }
+        }
+      },
+      "metricsSpec": [
+        {
+          "type": "count",
+          "name": "count"
+        },
+        {
+          "type": "longSum",
+          "name": "metric1",
+          "fieldName": "metric1"
+        }
+      ],
+      "granularitySpec": {
+        "type": "uniform",
+        "segmentGranularity": "DAY",
+        "queryGranularity": "NONE",
+        "intervals": [
+          "2018-08-30/2020-09-02"
+        ]
+      }
+    },
+    "tuningConfig": {
+      "type": "hadoop",
+      "workingPath": "tmp/working_path",
+      "partitionsSpec": {
+        "targetPartitionSize": 5000000
+      },
+      "jobProperties": {
+        "mapreduce.map.java.opts": "-server -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps",
+        "mapreduce.reduce.java.opts": "-server -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps",
+        "mapred.child.java.opts": "-server -XX:+PrintGCDetails -XX:+PrintGCTimeStamps"
+      },
+      "leaveIntermediate": true
+    }
+  }
+}
diff --git a/extensions-core/parquet-extensions/example/flattening/nested_1_autodiscover_fields.json b/extensions-core/parquet-extensions/example/flattening/nested_1_autodiscover_fields.json
new file mode 100644
index 00000000000..05171c0f130
--- /dev/null
+++ b/extensions-core/parquet-extensions/example/flattening/nested_1_autodiscover_fields.json
@@ -0,0 +1,75 @@
+{
+  "type": "index_hadoop",
+  "spec": {
+    "ioConfig": {
+      "type": "hadoop",
+      "inputSpec": {
+        "type": "static",
+        "inputFormat": "%s",
+        "paths": "example/flattening/test_nested_1.parquet"
+      },
+      "metadataUpdateSpec": {
+        "type": "postgresql",
+        "connectURI": "jdbc:postgresql://localhost/druid",
+        "user": "druid",
+        "password": "asdf",
+        "segmentTable": "druid_segments"
+      },
+      "segmentOutputPath": "/tmp/segments"
+    },
+    "dataSchema": {
+      "dataSource": "test1",
+      "parser": {
+        "type": "%s",
+        "parseSpec": {
+          "format": "%s",
+          "flattenSpec": {
+            "useFieldDiscovery": true,
+            "fields": []
+          },
+          "timestampSpec": {
+            "column": "timestamp",
+            "format": "auto"
+          },
+          "dimensionsSpec": {
+            "dimensions": [],
+            "dimensionExclusions": [],
+            "spatialDimensions": []
+          }
+        }
+      },
+      "metricsSpec": [
+        {
+          "type": "count",
+          "name": "count"
+        },
+        {
+          "type": "longSum",
+          "name": "metric1",
+          "fieldName": "metric1"
+        }
+      ],
+      "granularitySpec": {
+        "type": "uniform",
+        "segmentGranularity": "DAY",
+        "queryGranularity": "NONE",
+        "intervals": [
+          "2018-08-30/2020-09-02"
+        ]
+      }
+    },
+    "tuningConfig": {
+      "type": "hadoop",
+      "workingPath": "tmp/working_path",
+      "partitionsSpec": {
+        "targetPartitionSize": 5000000
+      },
+      "jobProperties": {
+        "mapreduce.map.java.opts": "-server -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps",
+        "mapreduce.reduce.java.opts": "-server -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps",
+        "mapred.child.java.opts": "-server -XX:+PrintGCDetails -XX:+PrintGCTimeStamps"
+      },
+      "leaveIntermediate": true
+    }
+  }
+}
diff --git a/extensions-core/parquet-extensions/example/flattening/nested_1_flatten.json b/extensions-core/parquet-extensions/example/flattening/nested_1_flatten.json
new file mode 100644
index 00000000000..62af8ca78c7
--- /dev/null
+++ b/extensions-core/parquet-extensions/example/flattening/nested_1_flatten.json
@@ -0,0 +1,109 @@
+{
+  "type": "index_hadoop",
+  "spec": {
+    "ioConfig": {
+      "type": "hadoop",
+      "inputSpec": {
+        "type": "static",
+        "inputFormat": "%s",
+        "paths": "example/flattening/test_nested_1.parquet"
+      },
+      "metadataUpdateSpec": {
+        "type": "postgresql",
+        "connectURI": "jdbc:postgresql://localhost/druid",
+        "user": "druid",
+        "password": "asdf",
+        "segmentTable": "druid_segments"
+      },
+      "segmentOutputPath": "/tmp/segments"
+    },
+    "dataSchema": {
+      "dataSource": "test1",
+      "parser": {
+        "type": "%s",
+        "parseSpec": {
+          "format": "%s",
+          "flattenSpec": {
+            "useFieldDiscovery": true,
+            "fields": [
+              {
+                "type": "root",
+                "name": "timestamp"
+              },
+              {
+                "type": "root",
+                "name": "dim1"
+              },
+              {
+                "type": "path",
+                "name": "dim2",
+                "expr": "$.nestedData.dim2"
+              },
+              {
+                "type": "path",
+                "name": "dim3",
+                "expr": "$.nestedData.dim3"
+              },
+              {
+                "type": "path",
+                "name": "metric2",
+                "expr": "$.nestedData.metric2"
+              },
+              {
+                "type": "path",
+                "name": "listDim",
+                "expr": "$.nestedData.listDim[*]"
+              }
+            ]
+          },
+          "timestampSpec": {
+            "column": "timestamp",
+            "format": "auto"
+          },
+          "dimensionsSpec": {
+            "dimensions": [],
+            "dimensionExclusions": [],
+            "spatialDimensions": []
+          }
+        }
+      },
+      "metricsSpec": [
+        {
+          "type": "count",
+          "name": "count"
+        },
+        {
+          "type": "longSum",
+          "name": "metric1",
+          "fieldName": "metric1"
+        },
+        {
+          "type": "longSum",
+          "name": "metric2",
+          "fieldName": "metric2"
+        }
+      ],
+      "granularitySpec": {
+        "type": "uniform",
+        "segmentGranularity": "DAY",
+        "queryGranularity": "NONE",
+        "intervals": [
+          "2018-08-30/2020-09-02"
+        ]
+      }
+    },
+    "tuningConfig": {
+      "type": "hadoop",
+      "workingPath": "tmp/working_path",
+      "partitionsSpec": {
+        "targetPartitionSize": 5000000
+      },
+      "jobProperties": {
+        "mapreduce.map.java.opts": "-server -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps",
+        "mapreduce.reduce.java.opts": "-server -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps",
+        "mapred.child.java.opts": "-server -XX:+PrintGCDetails -XX:+PrintGCTimeStamps"
+      },
+      "leaveIntermediate": true
+    }
+  }
+}
diff --git a/extensions-core/parquet-extensions/example/flattening/nested_1_list_index.json b/extensions-core/parquet-extensions/example/flattening/nested_1_list_index.json
new file mode 100644
index 00000000000..447c6f6c434
--- /dev/null
+++ b/extensions-core/parquet-extensions/example/flattening/nested_1_list_index.json
@@ -0,0 +1,99 @@
+{
+  "type": "index_hadoop",
+  "spec": {
+    "ioConfig": {
+      "type": "hadoop",
+      "inputSpec": {
+        "type": "static",
+        "inputFormat": "%s",
+        "paths": "example/flattening/test_nested_1.parquet"
+      },
+      "metadataUpdateSpec": {
+        "type": "postgresql",
+        "connectURI": "jdbc:postgresql://localhost/druid",
+        "user": "druid",
+        "password": "asdf",
+        "segmentTable": "druid_segments"
+      },
+      "segmentOutputPath": "/tmp/segments"
+    },
+    "dataSchema": {
+      "dataSource": "test1",
+      "parser": {
+        "type": "%s",
+        "parseSpec": {
+          "format": "%s",
+          "flattenSpec": {
+            "useFieldDiscovery": true,
+            "fields": [
+              {
+                "type": "root",
+                "name": "timestamp"
+              },
+              {
+                "type": "root",
+                "name": "dim1"
+              },
+              {
+                "type": "path",
+                "name": "dim2",
+                "expr": "$.nestedData.dim2"
+              },
+              {
+                "type": "path",
+                "name": "dim3",
+                "expr": "$.nestedData.dim3"
+              },
+              {
+                "type": "path",
+                "name": "listextracted",
+                "expr": "$.nestedData.listDim[1]"
+              }
+            ]
+          },
+          "timestampSpec": {
+            "column": "timestamp",
+            "format": "auto"
+          },
+          "dimensionsSpec": {
+            "dimensions": [],
+            "dimensionExclusions": [],
+            "spatialDimensions": []
+          }
+        }
+      },
+      "metricsSpec": [
+        {
+          "type": "count",
+          "name": "count"
+        },
+        {
+          "type": "longSum",
+          "name": "metric1",
+          "fieldName": "metric1"
+        }
+      ],
+      "granularitySpec": {
+        "type": "uniform",
+        "segmentGranularity": "DAY",
+        "queryGranularity": "NONE",
+        "intervals": [
+          "2018-08-30/2020-09-02"
+        ]
+      }
+    },
+    "tuningConfig": {
+      "type": "hadoop",
+      "workingPath": "tmp/working_path",
+      "partitionsSpec": {
+        "targetPartitionSize": 5000000
+      },
+      "jobProperties": {
+        "mapreduce.map.java.opts": "-server -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps",
+        "mapreduce.reduce.java.opts": "-server -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps",
+        "mapred.child.java.opts": "-server -XX:+PrintGCDetails -XX:+PrintGCTimeStamps"
+      },
+      "leaveIntermediate": true
+    }
+  }
+}
diff --git a/extensions-core/parquet-extensions/example/flattening/test_flat_1.parquet b/extensions-core/parquet-extensions/example/flattening/test_flat_1.parquet
new file mode 100644
index 00000000000..56325a5a8a8
Binary files /dev/null and b/extensions-core/parquet-extensions/example/flattening/test_flat_1.parquet differ
diff --git a/extensions-core/parquet-extensions/example/flattening/test_nested_1.parquet b/extensions-core/parquet-extensions/example/flattening/test_nested_1.parquet
new file mode 100644
index 00000000000..d3694e2acbc
Binary files /dev/null and b/extensions-core/parquet-extensions/example/flattening/test_nested_1.parquet differ
diff --git a/extensions-contrib/parquet-extensions/example/date_test_data_job_date.json b/extensions-core/parquet-extensions/example/timestamps/date_test_data_job_date.json
similarity index 78%
rename from extensions-contrib/parquet-extensions/example/date_test_data_job_date.json
rename to extensions-core/parquet-extensions/example/timestamps/date_test_data_job_date.json
index efdedf9f4bc..19677515f47 100755
--- a/extensions-contrib/parquet-extensions/example/date_test_data_job_date.json
+++ b/extensions-core/parquet-extensions/example/timestamps/date_test_data_job_date.json
@@ -5,14 +5,14 @@
       "type": "hadoop",
       "inputSpec": {
         "type": "static",
-        "inputFormat": "org.apache.druid.data.input.parquet.DruidParquetInputFormat",
-        "paths": "example/test_date_data.snappy.parquet"
+        "inputFormat": "%s",
+        "paths": "example/timestamps/test_date_data.snappy.parquet"
       },
       "metadataUpdateSpec": {
         "type": "postgresql",
         "connectURI": "jdbc:postgresql://localhost/druid",
-        "user" : "druid",
-        "password" : "asdf",
+        "user": "druid",
+        "password": "asdf",
         "segmentTable": "druid_segments"
       },
       "segmentOutputPath": "/tmp/segments"
@@ -20,7 +20,7 @@
     "dataSchema": {
       "dataSource": "date_dataset_date",
       "parser": {
-        "type": "parquet",
+        "type": "%s",
         "parseSpec": {
           "format": "timeAndDims",
           "timestampSpec": {
@@ -33,15 +33,19 @@
           }
         }
       },
-      "metricsSpec": [{
-        "type": "count",
-        "name": "count"
-      }],
+      "metricsSpec": [
+        {
+          "type": "count",
+          "name": "count"
+        }
+      ],
       "granularitySpec": {
         "type": "uniform",
         "segmentGranularity": "DAY",
         "queryGranularity": "NONE",
-        "intervals": ["2017-06-17/2017-09-24"]
+        "intervals": [
+          "2017-06-17/2017-09-24"
+        ]
       }
     },
     "tuningConfig": {
@@ -50,7 +54,7 @@
       "partitionsSpec": {
         "targetPartitionSize": 5000000
       },
-      "jobProperties" : {
+      "jobProperties": {
         "mapreduce.map.java.opts": "-server -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps",
         "mapreduce.reduce.java.opts": "-server -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps",
         "mapred.child.java.opts": "-server -XX:+PrintGCDetails -XX:+PrintGCTimeStamps"
diff --git a/extensions-contrib/parquet-extensions/example/date_test_data_job_string.json b/extensions-core/parquet-extensions/example/timestamps/date_test_data_job_string.json
similarity index 79%
rename from extensions-contrib/parquet-extensions/example/date_test_data_job_string.json
rename to extensions-core/parquet-extensions/example/timestamps/date_test_data_job_string.json
index 6da2c081a90..ddccab41d61 100755
--- a/extensions-contrib/parquet-extensions/example/date_test_data_job_string.json
+++ b/extensions-core/parquet-extensions/example/timestamps/date_test_data_job_string.json
@@ -5,14 +5,14 @@
       "type": "hadoop",
       "inputSpec": {
         "type": "static",
-        "inputFormat": "org.apache.druid.data.input.parquet.DruidParquetInputFormat",
-        "paths": "example/test_date_data.snappy.parquet"
+        "inputFormat": "%s",
+        "paths": "example/timestamps/test_date_data.snappy.parquet"
       },
       "metadataUpdateSpec": {
         "type": "postgresql",
         "connectURI": "jdbc:postgresql://localhost/druid",
-        "user" : "druid",
-        "password" : "asdf",
+        "user": "druid",
+        "password": "asdf",
         "segmentTable": "druid_segments"
       },
       "segmentOutputPath": "/tmp/segments"
@@ -20,7 +20,7 @@
     "dataSchema": {
       "dataSource": "date_dataset_string",
       "parser": {
-        "type": "parquet",
+        "type": "%s",
         "parseSpec": {
           "format": "timeAndDims",
           "timestampSpec": {
@@ -34,15 +34,19 @@
           }
         }
       },
-      "metricsSpec": [{
-        "type": "count",
-        "name": "count"
-      }],
+      "metricsSpec": [
+        {
+          "type": "count",
+          "name": "count"
+        }
+      ],
       "granularitySpec": {
         "type": "uniform",
         "segmentGranularity": "DAY",
         "queryGranularity": "NONE",
-        "intervals": ["2017-06-17/2017-09-24"]
+        "intervals": [
+          "2017-06-17/2017-09-24"
+        ]
       }
     },
     "tuningConfig": {
@@ -51,7 +55,7 @@
       "partitionsSpec": {
         "targetPartitionSize": 5000000
       },
-      "jobProperties" : {
+      "jobProperties": {
         "mapreduce.map.java.opts": "-server -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps",
         "mapreduce.reduce.java.opts": "-server -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps",
         "mapred.child.java.opts": "-server -XX:+PrintGCDetails -XX:+PrintGCTimeStamps"
diff --git a/extensions-core/parquet-extensions/example/timestamps/int96_timestamp.json b/extensions-core/parquet-extensions/example/timestamps/int96_timestamp.json
new file mode 100644
index 00000000000..15eb1f11ad6
--- /dev/null
+++ b/extensions-core/parquet-extensions/example/timestamps/int96_timestamp.json
@@ -0,0 +1,64 @@
+{
+  "type": "index_hadoop",
+  "spec": {
+    "ioConfig": {
+      "type": "hadoop",
+      "inputSpec": {
+        "type": "static",
+        "inputFormat": "%s",
+        "paths": "example/timestamps/int96_timestamp.parquet"
+      },
+      "metadataUpdateSpec": {
+        "type": "postgresql",
+        "connectURI": "jdbc:postgresql://localhost/druid",
+        "user": "druid",
+        "password": "asdf",
+        "segmentTable": "druid_segments"
+      },
+      "segmentOutputPath": "/tmp/segments"
+    },
+    "dataSchema": {
+      "dataSource": "nest",
+      "parser": {
+        "type": "%s",
+        "parseSpec": {
+          "format": "%s",
+          "flattenSpec": {
+            "useFieldDiscovery": true
+          },
+          "timestampSpec": {
+            "column": "ts",
+            "format": "auto"
+          }
+        }
+      },
+      "metricsSpec": [
+        {
+          "type": "count",
+          "name": "count"
+        }
+      ],
+      "granularitySpec": {
+        "type": "uniform",
+        "segmentGranularity": "DAY",
+        "queryGranularity": "NONE",
+        "intervals": [
+          "2018-08-30/2020-09-02"
+        ]
+      }
+    },
+    "tuningConfig": {
+      "type": "hadoop",
+      "workingPath": "tmp/working_path",
+      "partitionsSpec": {
+        "targetPartitionSize": 5000000
+      },
+      "jobProperties": {
+        "mapreduce.map.java.opts": "-server -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps",
+        "mapreduce.reduce.java.opts": "-server -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps",
+        "mapred.child.java.opts": "-server -XX:+PrintGCDetails -XX:+PrintGCTimeStamps"
+      },
+      "leaveIntermediate": true
+    }
+  }
+}
diff --git a/extensions-core/parquet-extensions/example/timestamps/int96_timestamp.parquet b/extensions-core/parquet-extensions/example/timestamps/int96_timestamp.parquet
new file mode 100644
index 00000000000..21e5318db98
Binary files /dev/null and b/extensions-core/parquet-extensions/example/timestamps/int96_timestamp.parquet differ
diff --git a/extensions-contrib/parquet-extensions/example/test_date_data.snappy.parquet b/extensions-core/parquet-extensions/example/timestamps/test_date_data.snappy.parquet
similarity index 100%
rename from extensions-contrib/parquet-extensions/example/test_date_data.snappy.parquet
rename to extensions-core/parquet-extensions/example/timestamps/test_date_data.snappy.parquet
diff --git a/extensions-core/parquet-extensions/example/timestamps/timemillis-in-i64.parquet b/extensions-core/parquet-extensions/example/timestamps/timemillis-in-i64.parquet
new file mode 100644
index 00000000000..d3c39e2c26e
Binary files /dev/null and b/extensions-core/parquet-extensions/example/timestamps/timemillis-in-i64.parquet differ
diff --git a/extensions-core/parquet-extensions/example/timestamps/timemillis_in_i64.json b/extensions-core/parquet-extensions/example/timestamps/timemillis_in_i64.json
new file mode 100644
index 00000000000..536b83cc5a4
--- /dev/null
+++ b/extensions-core/parquet-extensions/example/timestamps/timemillis_in_i64.json
@@ -0,0 +1,70 @@
+{
+  "type": "index_hadoop",
+  "spec": {
+    "ioConfig": {
+      "type": "hadoop",
+      "inputSpec": {
+        "type": "static",
+        "inputFormat": "%s",
+        "paths": "example/timestamps/timemillis-in-i64.parquet"
+      },
+      "metadataUpdateSpec": {
+        "type": "postgresql",
+        "connectURI": "jdbc:postgresql://localhost/druid",
+        "user": "druid",
+        "password": "asdf",
+        "segmentTable": "druid_segments"
+      },
+      "segmentOutputPath": "/tmp/segments"
+    },
+    "dataSchema": {
+      "dataSource": "flat",
+      "parser": {
+        "type": "%s",
+        "parseSpec": {
+          "format": "%s",
+          "flattenSpec": {
+            "useFieldDiscovery": true,
+            "fields": []
+          },
+          "timestampSpec": {
+            "column": "time",
+            "format": "auto"
+          },
+          "dimensionsSpec": {
+            "dimensions": [],
+            "dimensionExclusions": [],
+            "spatialDimensions": []
+          }
+        }
+      },
+      "metricsSpec": [
+        {
+          "type": "count",
+          "name": "count"
+        }
+      ],
+      "granularitySpec": {
+        "type": "uniform",
+        "segmentGranularity": "DAY",
+        "queryGranularity": "NONE",
+        "intervals": [
+          "2018-08-30/2020-09-02"
+        ]
+      }
+    },
+    "tuningConfig": {
+      "type": "hadoop",
+      "workingPath": "tmp/working_path",
+      "partitionsSpec": {
+        "targetPartitionSize": 5000000
+      },
+      "jobProperties": {
+        "mapreduce.map.java.opts": "-server -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps",
+        "mapreduce.reduce.java.opts": "-server -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps",
+        "mapred.child.java.opts": "-server -XX:+PrintGCDetails -XX:+PrintGCTimeStamps"
+      },
+      "leaveIntermediate": true
+    }
+  }
+}
diff --git a/extensions-contrib/parquet-extensions/example/wikipedia_hadoop_parquet_job.json b/extensions-core/parquet-extensions/example/wiki/wiki.json
similarity index 73%
rename from extensions-contrib/parquet-extensions/example/wikipedia_hadoop_parquet_job.json
rename to extensions-core/parquet-extensions/example/wiki/wiki.json
index 4de52c5d0ad..9f611b9638c 100755
--- a/extensions-contrib/parquet-extensions/example/wikipedia_hadoop_parquet_job.json
+++ b/extensions-core/parquet-extensions/example/wiki/wiki.json
@@ -5,14 +5,14 @@
       "type": "hadoop",
       "inputSpec": {
         "type": "static",
-        "inputFormat": "org.apache.druid.data.input.parquet.DruidParquetInputFormat",
-        "paths": "example/wikipedia_list.parquet"
+        "inputFormat": "%s",
+        "paths": "example/wiki/wiki.parquet"
       },
       "metadataUpdateSpec": {
         "type": "postgresql",
         "connectURI": "jdbc:postgresql://localhost/druid",
-        "user" : "druid",
-        "password" : "asdf",
+        "user": "druid",
+        "password": "asdf",
         "segmentTable": "druid_segments"
       },
       "segmentOutputPath": "/tmp/segments"
@@ -20,7 +20,7 @@
     "dataSchema": {
       "dataSource": "wikipedia",
       "parser": {
-        "type": "parquet",
+        "type": "%s",
         "parseSpec": {
           "format": "timeAndDims",
           "timestampSpec": {
@@ -39,23 +39,29 @@
           }
         }
       },
-      "metricsSpec": [{
-        "type": "count",
-        "name": "count"
-      }, {
-        "type": "doubleSum",
-        "name": "deleted",
-        "fieldName": "deleted"
-      }, {
-        "type": "doubleSum",
-        "name": "delta",
-        "fieldName": "delta"
-      }],
+      "metricsSpec": [
+        {
+          "type": "count",
+          "name": "count"
+        },
+        {
+          "type": "doubleSum",
+          "name": "deleted",
+          "fieldName": "deleted"
+        },
+        {
+          "type": "doubleSum",
+          "name": "delta",
+          "fieldName": "delta"
+        }
+      ],
       "granularitySpec": {
         "type": "uniform",
         "segmentGranularity": "DAY",
         "queryGranularity": "NONE",
-        "intervals": ["2013-08-30/2013-09-02"]
+        "intervals": [
+          "2013-08-30/2013-09-02"
+        ]
       }
     },
     "tuningConfig": {
@@ -64,7 +70,7 @@
       "partitionsSpec": {
         "targetPartitionSize": 5000000
       },
-      "jobProperties" : {
+      "jobProperties": {
         "mapreduce.map.java.opts": "-server -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps",
         "mapreduce.reduce.java.opts": "-server -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps",
         "mapred.child.java.opts": "-server -XX:+PrintGCDetails -XX:+PrintGCTimeStamps"
diff --git a/extensions-contrib/parquet-extensions/example/wikipedia_list.parquet b/extensions-core/parquet-extensions/example/wiki/wiki.parquet
similarity index 100%
rename from extensions-contrib/parquet-extensions/example/wikipedia_list.parquet
rename to extensions-core/parquet-extensions/example/wiki/wiki.parquet
diff --git a/extensions-core/parquet-extensions/pom.xml b/extensions-core/parquet-extensions/pom.xml
new file mode 100644
index 00000000000..bfdcf267898
--- /dev/null
+++ b/extensions-core/parquet-extensions/pom.xml
@@ -0,0 +1,127 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <groupId>org.apache.druid.extensions</groupId>
+  <artifactId>druid-parquet-extensions</artifactId>
+  <name>druid-parquet-extensions</name>
+  <description>druid-parquet-extensions</description>
+
+  <parent>
+    <artifactId>druid</artifactId>
+    <groupId>org.apache.druid</groupId>
+    <version>0.13.0-incubating-SNAPSHOT</version>
+    <relativePath>../../pom.xml</relativePath>
+  </parent>
+  <modelVersion>4.0.0</modelVersion>
+
+  <properties>
+    <parquet.version>1.10.0</parquet.version>
+    <snappy.version>1.1.7.2</snappy.version>
+  </properties>
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.druid.extensions</groupId>
+      <artifactId>druid-avro-extensions</artifactId>
+      <version>${project.parent.version}</version>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.parquet</groupId>
+      <artifactId>parquet-column</artifactId>
+      <version>${parquet.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>commons-codec</groupId>
+          <artifactId>commons-codec</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>commons-compress</groupId>
+          <artifactId>commons-compress</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-api</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.parquet</groupId>
+      <artifactId>parquet-avro</artifactId>
+      <version>${parquet.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>org.apache.avro</groupId>
+          <artifactId>avro</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>commons-pool</groupId>
+          <artifactId>commons-pool</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>it.unimi.dsi</groupId>
+          <artifactId>fastutil</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.codehaus.jackson</groupId>
+          <artifactId>jackson-core-asl</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.codehaus.jackson</groupId>
+          <artifactId>jackson-mapper-asl</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-api</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.xerial.snappy</groupId>
+      <artifactId>snappy-java</artifactId>
+      <version>${snappy.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.druid</groupId>
+      <artifactId>druid-core</artifactId>
+      <version>${project.parent.version}</version>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.druid</groupId>
+      <artifactId>druid-indexing-hadoop</artifactId>
+      <version>${project.parent.version}</version>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-client</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+</project>
diff --git a/extensions-contrib/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetExtensionsModule.java b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetExtensionsModule.java
similarity index 67%
rename from extensions-contrib/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetExtensionsModule.java
rename to extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetExtensionsModule.java
index 49ba6cc0fa0..096cf209a46 100644
--- a/extensions-contrib/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetExtensionsModule.java
+++ b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetExtensionsModule.java
@@ -23,6 +23,9 @@
 import com.fasterxml.jackson.databind.jsontype.NamedType;
 import com.fasterxml.jackson.databind.module.SimpleModule;
 import com.google.inject.Binder;
+import org.apache.druid.data.input.parquet.avro.ParquetAvroHadoopInputRowParser;
+import org.apache.druid.data.input.parquet.simple.ParquetHadoopInputRowParser;
+import org.apache.druid.data.input.parquet.simple.ParquetParseSpec;
 import org.apache.druid.initialization.DruidModule;
 
 import java.util.Collections;
@@ -30,6 +33,10 @@
 
 public class ParquetExtensionsModule implements DruidModule
 {
+  public static String PARQUET_SIMPLE_INPUT_PARSER_TYPE = "parquet";
+  public static String PARQUET_SIMPLE_PARSE_SPEC_TYPE = "parquet";
+  public static String PARQUET_AVRO_INPUT_PARSER_TYPE = "parquet-avro";
+  public static String PARQUET_AVRO_PARSE_SPEC_TYPE = "avro";
 
   @Override
   public List<? extends Module> getJacksonModules()
@@ -37,12 +44,16 @@
     return Collections.singletonList(
         new SimpleModule("ParquetInputRowParserModule")
             .registerSubtypes(
-                new NamedType(ParquetHadoopInputRowParser.class, "parquet")
+                new NamedType(ParquetAvroHadoopInputRowParser.class, PARQUET_AVRO_INPUT_PARSER_TYPE),
+                new NamedType(ParquetHadoopInputRowParser.class, PARQUET_SIMPLE_INPUT_PARSER_TYPE),
+                new NamedType(ParquetParseSpec.class, PARQUET_SIMPLE_INPUT_PARSER_TYPE)
             )
     );
   }
 
   @Override
   public void configure(Binder binder)
-  { }
+  {
+
+  }
 }
diff --git a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/avro/DruidParquetAvroInputFormat.java b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/avro/DruidParquetAvroInputFormat.java
new file mode 100755
index 00000000000..783ec983e0a
--- /dev/null
+++ b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/avro/DruidParquetAvroInputFormat.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.druid.data.input.parquet.avro;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.parquet.avro.DruidParquetAvroReadSupport;
+import org.apache.parquet.hadoop.ParquetInputFormat;
+
+public class DruidParquetAvroInputFormat extends ParquetInputFormat<GenericRecord>
+{
+  public DruidParquetAvroInputFormat()
+  {
+    super(DruidParquetAvroReadSupport.class);
+  }
+}
diff --git a/extensions-contrib/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetHadoopInputRowParser.java b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/avro/ParquetAvroHadoopInputRowParser.java
similarity index 61%
rename from extensions-contrib/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetHadoopInputRowParser.java
rename to extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/avro/ParquetAvroHadoopInputRowParser.java
index baa738389f3..cdf1f85c9a5 100755
--- a/extensions-contrib/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetHadoopInputRowParser.java
+++ b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/avro/ParquetAvroHadoopInputRowParser.java
@@ -16,38 +16,45 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.druid.data.input.parquet;
+package org.apache.druid.data.input.parquet.avro;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Sets;
 import org.apache.avro.LogicalType;
 import org.apache.avro.LogicalTypes;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.druid.data.input.InputRow;
 import org.apache.druid.data.input.MapBasedInputRow;
-import org.apache.druid.data.input.impl.DimensionSchema;
+import org.apache.druid.data.input.avro.AvroFlattenerMaker;
+import org.apache.druid.data.input.avro.AvroParseSpec;
 import org.apache.druid.data.input.impl.InputRowParser;
 import org.apache.druid.data.input.impl.ParseSpec;
 import org.apache.druid.data.input.impl.TimestampSpec;
 import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.parsers.JSONPathSpec;
+import org.apache.druid.java.util.common.parsers.ObjectFlattener;
+import org.apache.druid.java.util.common.parsers.ObjectFlatteners;
 import org.joda.time.DateTime;
 
 import javax.annotation.Nullable;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
-public class ParquetHadoopInputRowParser implements InputRowParser<GenericRecord>
+public class ParquetAvroHadoopInputRowParser implements InputRowParser<GenericRecord>
 {
   private final ParseSpec parseSpec;
   private final boolean binaryAsString;
-  private final List<String> dimensions;
   private final TimestampSpec timestampSpec;
+  private final ObjectFlattener<GenericRecord> recordFlattener;
+
 
   @JsonCreator
-  public ParquetHadoopInputRowParser(
+  public ParquetAvroHadoopInputRowParser(
       @JsonProperty("parseSpec") ParseSpec parseSpec,
       @JsonProperty("binaryAsString") Boolean binaryAsString
   )
@@ -56,11 +63,17 @@ public ParquetHadoopInputRowParser(
     this.timestampSpec = parseSpec.getTimestampSpec();
     this.binaryAsString = binaryAsString == null ? false : binaryAsString;
 
-    List<DimensionSchema> dimensionSchema = parseSpec.getDimensionsSpec().getDimensions();
-    this.dimensions = new ArrayList<>();
-    for (DimensionSchema dim : dimensionSchema) {
-      this.dimensions.add(dim.getName());
+    final JSONPathSpec flattenSpec;
+    if (parseSpec != null && (parseSpec instanceof AvroParseSpec)) {
+      flattenSpec = ((AvroParseSpec) parseSpec).getFlattenSpec();
+    } else {
+      flattenSpec = JSONPathSpec.DEFAULT;
     }
+
+    this.recordFlattener = ObjectFlatteners.create(
+        flattenSpec,
+        new AvroFlattenerMaker(false, this.binaryAsString)
+    );
   }
 
   @Nullable
@@ -80,25 +93,29 @@ private LogicalType determineTimestampSpecLogicalType(Schema schema, String time
   @Override
   public List<InputRow> parseBatch(GenericRecord record)
   {
-    // Map the record to a map
-    GenericRecordAsMap genericRecordAsMap = new GenericRecordAsMap(record, binaryAsString);
+    Map<String, Object> row = recordFlattener.flatten(record);
 
-    // Determine logical type of the timestamp column
+    final List<String> dimensions = parseSpec.getDimensionsSpec().hasCustomDimensions()
+                                    ? parseSpec.getDimensionsSpec().getDimensionNames()
+                                    : new ArrayList(
+                                        Sets.difference(
+                                            row.keySet(),
+                                            parseSpec.getDimensionsSpec()
+                                                     .getDimensionExclusions()
+                                        )
+                                    );
+    // check for parquet Date
+    // https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#date
     LogicalType logicalType = determineTimestampSpecLogicalType(record.getSchema(), timestampSpec.getTimestampColumn());
-
-    // Parse time timestamp based on the parquet schema.
-    // https://github.com/Parquet/parquet-format/blob/1afe8d9ae7e38acfc4ea273338a3c0c35feca115/LogicalTypes.md#date
     DateTime dateTime;
     if (logicalType instanceof LogicalTypes.Date) {
-      int daysSinceEpoch = (Integer) genericRecordAsMap.get(timestampSpec.getTimestampColumn());
-
+      int daysSinceEpoch = (Integer) record.get(timestampSpec.getTimestampColumn());
       dateTime = DateTimes.utc(TimeUnit.DAYS.toMillis(daysSinceEpoch));
     } else {
       // Fall back to a binary format that will be parsed using joda-time
-      dateTime = timestampSpec.extractTimestamp(genericRecordAsMap);
+      dateTime = timestampSpec.extractTimestamp(row);
     }
-
-    return ImmutableList.of(new MapBasedInputRow(dateTime, dimensions, genericRecordAsMap));
+    return ImmutableList.of(new MapBasedInputRow(dateTime, dimensions, row));
   }
 
   @JsonProperty
@@ -111,6 +128,6 @@ public ParseSpec getParseSpec()
   @Override
   public InputRowParser withParseSpec(ParseSpec parseSpec)
   {
-    return new ParquetHadoopInputRowParser(parseSpec, binaryAsString);
+    return new ParquetAvroHadoopInputRowParser(parseSpec, binaryAsString);
   }
 }
diff --git a/extensions-contrib/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/DruidParquetInputFormat.java b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/simple/DruidParquetInputFormat.java
old mode 100755
new mode 100644
similarity index 86%
rename from extensions-contrib/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/DruidParquetInputFormat.java
rename to extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/simple/DruidParquetInputFormat.java
index a192893a377..ed8e623c6ce
--- a/extensions-contrib/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/DruidParquetInputFormat.java
+++ b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/simple/DruidParquetInputFormat.java
@@ -16,13 +16,13 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.druid.data.input.parquet;
 
-import org.apache.avro.generic.GenericRecord;
-import org.apache.parquet.avro.DruidParquetReadSupport;
+package org.apache.druid.data.input.parquet.simple;
+
+import org.apache.parquet.example.data.Group;
 import org.apache.parquet.hadoop.ParquetInputFormat;
 
-public class DruidParquetInputFormat extends ParquetInputFormat<GenericRecord>
+public class DruidParquetInputFormat extends ParquetInputFormat<Group>
 {
   public DruidParquetInputFormat()
   {
diff --git a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/simple/DruidParquetReadSupport.java b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/simple/DruidParquetReadSupport.java
new file mode 100644
index 00000000000..a1ed46c0f65
--- /dev/null
+++ b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/simple/DruidParquetReadSupport.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.parquet.simple;
+
+import org.apache.druid.data.input.impl.DimensionSchema;
+import org.apache.druid.data.input.impl.ParseSpec;
+import org.apache.druid.indexer.HadoopDruidIndexerConfig;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.parquet.hadoop.api.InitContext;
+import org.apache.parquet.hadoop.example.GroupReadSupport;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Type;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+public class DruidParquetReadSupport extends GroupReadSupport
+{
+  /**
+   * Select the columns from the parquet schema that are used in the schema of the ingestion job
+   *
+   * @param context The context of the file to be read
+   *
+   * @return the partial schema that only contains the columns that are being used in the schema
+   */
+  private MessageType getPartialReadSchema(InitContext context)
+  {
+    MessageType fullSchema = context.getFileSchema();
+
+    String name = fullSchema.getName();
+
+    HadoopDruidIndexerConfig config = HadoopDruidIndexerConfig.fromConfiguration(context.getConfiguration());
+    ParseSpec parseSpec = config.getParser().getParseSpec();
+
+    // this is kind of lame, maybe we can still trim what we read if we
+    // parse the flatten spec and determine it isn't auto discovering props?
+    if (parseSpec instanceof ParquetParseSpec) {
+      if (((ParquetParseSpec) parseSpec).getFlattenSpec() != null) {
+        return fullSchema;
+      }
+    }
+
+    String tsField = parseSpec.getTimestampSpec().getTimestampColumn();
+
+    List<DimensionSchema> dimensionSchema = parseSpec.getDimensionsSpec().getDimensions();
+    Set<String> dimensions = new HashSet<>();
+    for (DimensionSchema dim : dimensionSchema) {
+      dimensions.add(dim.getName());
+    }
+
+    Set<String> metricsFields = new HashSet<>();
+    for (AggregatorFactory agg : config.getSchema().getDataSchema().getAggregators()) {
+      metricsFields.addAll(agg.requiredFields());
+    }
+
+    List<Type> partialFields = new ArrayList<>();
+
+    for (Type type : fullSchema.getFields()) {
+      if (tsField.equals(type.getName())
+          || metricsFields.contains(type.getName())
+          || dimensions.size() > 0 && dimensions.contains(type.getName())
+          || dimensions.size() == 0) {
+        partialFields.add(type);
+      }
+    }
+
+    return new MessageType(name, partialFields);
+  }
+
+  @Override
+  public ReadContext init(InitContext context)
+  {
+    MessageType requestedProjection = getSchemaForRead(context.getFileSchema(), getPartialReadSchema(context));
+    return new ReadContext(requestedProjection);
+  }
+}
diff --git a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/simple/ParquetGroupConverter.java b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/simple/ParquetGroupConverter.java
new file mode 100644
index 00000000000..8cb193862e3
--- /dev/null
+++ b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/simple/ParquetGroupConverter.java
@@ -0,0 +1,497 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.parquet.simple;
+
+import com.google.common.primitives.Ints;
+import com.google.common.primitives.Longs;
+import org.apache.druid.java.util.common.RE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
+import org.joda.time.Duration;
+import org.joda.time.Period;
+
+import javax.annotation.Nullable;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.IntBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+class ParquetGroupConverter
+{
+  private static final int JULIAN_EPOCH_OFFSET_DAYS = 2_440_588;
+  private static final long MILLIS_IN_DAY = TimeUnit.DAYS.toMillis(1);
+  private static final long NANOS_PER_MILLISECOND = TimeUnit.MILLISECONDS.toNanos(1);
+
+  /**
+   * See {@link ParquetGroupConverter#convertField(Group, String)}
+   */
+  @Nullable
+  private static Object convertField(Group g, String fieldName, boolean binaryAsString)
+  {
+    if (!g.getType().containsField(fieldName)) {
+      return null;
+    }
+
+    final int fieldIndex = g.getType().getFieldIndex(fieldName);
+
+    Type fieldType = g.getType().getFields().get(fieldIndex);
+
+    // primitive field
+    if (fieldType.isPrimitive()) {
+      // primitive list
+      if (fieldType.getRepetition().equals(Type.Repetition.REPEATED)) {
+        int repeated = g.getFieldRepetitionCount(fieldIndex);
+        List<Object> vals = new ArrayList<>();
+        for (int i = 0; i < repeated; i++) {
+          vals.add(convertPrimitiveField(g, fieldIndex, i, binaryAsString));
+        }
+        return vals;
+      }
+      return convertPrimitiveField(g, fieldIndex, binaryAsString);
+    } else {
+      if (fieldType.isRepetition(Type.Repetition.REPEATED)) {
+        return convertRepeatedFieldToList(g, fieldIndex, binaryAsString);
+      }
+
+      if (isLogicalMapType(fieldType)) {
+        return convertLogicalMap(g.getGroup(fieldIndex, 0), binaryAsString);
+      }
+
+      if (isLogicalListType(fieldType)) {
+        return convertLogicalList(g.getGroup(fieldIndex, 0), binaryAsString);
+      }
+
+      // not a list, but not a primtive, return the nested group type
+      return g.getGroup(fieldIndex, 0);
+    }
+  }
+
+  /**
+   * convert a repeated field into a list of primitives or groups
+   */
+  private static List<Object> convertRepeatedFieldToList(Group g, int fieldIndex, boolean binaryAsString)
+  {
+
+    Type t = g.getType().getFields().get(fieldIndex);
+    assert t.getRepetition().equals(Type.Repetition.REPEATED);
+    int repeated = g.getFieldRepetitionCount(fieldIndex);
+    List<Object> vals = new ArrayList<>();
+    for (int i = 0; i < repeated; i++) {
+      if (t.isPrimitive()) {
+        vals.add(convertPrimitiveField(g, fieldIndex, i, binaryAsString));
+      } else {
+        vals.add(g.getGroup(fieldIndex, i));
+      }
+    }
+    return vals;
+  }
+
+  /**
+   * check if a parquet type is a valid 'list' type
+   */
+  private static boolean isLogicalListType(Type listType)
+  {
+    return !listType.isPrimitive() &&
+           listType.getOriginalType() != null &&
+           listType.getOriginalType().equals(OriginalType.LIST) &&
+           listType.asGroupType().getFieldCount() == 1 &&
+           listType.asGroupType().getFields().get(0).isRepetition(Type.Repetition.REPEATED);
+  }
+
+  /**
+   * convert a parquet 'list' logical type {@link Group} to a java list of primitives or groups
+   */
+  private static List<Object> convertLogicalList(Group g, boolean binaryAsString)
+  {
+    /*
+      // List<Integer> (nullable list, non-null elements)
+      optional group my_list (LIST) {
+        repeated int32 element;
+      }
+
+      // List<Tuple<String, Integer>> (nullable list, non-null elements)
+      optional group my_list (LIST) {
+        repeated group element {
+          required binary str (UTF8);
+          required int32 num;
+        };
+      }
+
+      // List<OneTuple<String>> (nullable list, non-null elements)
+      optional group my_list (LIST) {
+        repeated group array {
+          required binary str (UTF8);
+        };
+      }
+
+      // List<OneTuple<String>> (nullable list, non-null elements)
+      optional group my_list (LIST) {
+        repeated group my_list_tuple {
+          required binary str (UTF8);
+        };
+      }
+     */
+    assert isLogicalListType(g.getType());
+    int repeated = g.getFieldRepetitionCount(0);
+    boolean isListItemPrimitive = g.getType().getFields().get(0).isPrimitive();
+    List<Object> vals = new ArrayList<>();
+
+    for (int i = 0; i < repeated; i++) {
+      if (isListItemPrimitive) {
+        vals.add(convertPrimitiveField(g, 0, i, binaryAsString));
+      } else {
+        Group listItem = g.getGroup(0, i);
+        vals.add(listItem);
+      }
+    }
+    return vals;
+  }
+
+  /**
+   * check if a parquet type is a valid 'map' type
+   */
+  private static boolean isLogicalMapType(Type groupType)
+  {
+    OriginalType ot = groupType.getOriginalType();
+    if (groupType.isPrimitive() || ot == null || groupType.isRepetition(Type.Repetition.REPEATED)) {
+      return false;
+    }
+    if (groupType.getOriginalType().equals(OriginalType.MAP) ||
+        groupType.getOriginalType().equals(OriginalType.MAP_KEY_VALUE)) {
+      GroupType myMapType = groupType.asGroupType();
+      if (myMapType.getFieldCount() != 1 || myMapType.getFields().get(0).isPrimitive()) {
+        return false;
+      }
+      GroupType mapItemType = myMapType.getFields().get(0).asGroupType();
+      return mapItemType.isRepetition(Type.Repetition.REPEATED) &&
+             mapItemType.getFieldCount() == 2 &&
+             mapItemType.getFields().get(0).getName().equalsIgnoreCase("key") &&
+             mapItemType.getFields().get(0).isPrimitive() &&
+             mapItemType.getFields().get(1).getName().equalsIgnoreCase("value");
+    }
+    return false;
+  }
+
+  /**
+   * Convert a parquet 'map' logical type {@link Group} to a java map of string keys to groups/lists/primitive values
+   */
+  private static Map<String, Object> convertLogicalMap(Group g, boolean binaryAsString)
+  {
+    /*
+      // Map<String, Integer> (nullable map, non-null values)
+      optional group my_map (MAP) {
+        repeated group map {
+          required binary str (UTF8);
+          required int32 num;
+        }
+      }
+
+      // Map<String, Integer> (nullable map, nullable values)
+      optional group my_map (MAP_KEY_VALUE) {(
+        repeated group map {
+          required binary key (UTF8);
+          optional int32 value;
+        }
+      }
+     */
+    assert isLogicalMapType(g.getType());
+    int mapEntries = g.getFieldRepetitionCount(0);
+    Map<String, Object> converted = new HashMap<>();
+    for (int i = 0; i < mapEntries; i++) {
+      Group mapEntry = g.getGroup(0, i);
+      String key = convertPrimitiveField(mapEntry, 0, binaryAsString).toString();
+      Object value = convertField(mapEntry, "value", binaryAsString);
+      converted.put(key, value);
+    }
+    return converted;
+  }
+
+  /**
+   * Convert a primitive group field to a "ingestion friendly" java object
+   *
+   * @return "ingestion ready" java object, or null
+   */
+  @Nullable
+  private static Object convertPrimitiveField(Group g, int fieldIndex, boolean binaryAsString)
+  {
+    PrimitiveType pt = (PrimitiveType) g.getType().getFields().get(fieldIndex);
+    if (pt.isRepetition(Type.Repetition.REPEATED) && g.getFieldRepetitionCount(fieldIndex) > 1) {
+      List<Object> vals = new ArrayList<>();
+      for (int i = 0; i < g.getFieldRepetitionCount(fieldIndex); i++) {
+        vals.add(convertPrimitiveField(g, fieldIndex, i, binaryAsString));
+      }
+      return vals;
+    }
+    return convertPrimitiveField(g, fieldIndex, 0, binaryAsString);
+  }
+
+  /**
+   * Convert a primitive group field to a "ingestion friendly" java object
+   *
+   * @return "ingestion ready" java object, or null
+   */
+  @Nullable
+  private static Object convertPrimitiveField(Group g, int fieldIndex, int index, boolean binaryAsString)
+  {
+    PrimitiveType pt = (PrimitiveType) g.getType().getFields().get(fieldIndex);
+    OriginalType ot = pt.getOriginalType();
+
+    try {
+      if (ot != null) {
+        // convert logical types
+        switch (ot) {
+          case DATE:
+            long ts = g.getInteger(fieldIndex, 0) * MILLIS_IN_DAY;
+            return ts;
+          case TIME_MICROS:
+            return g.getLong(fieldIndex, index);
+          case TIME_MILLIS:
+            return g.getInteger(fieldIndex, index);
+          case TIMESTAMP_MICROS:
+            return TimeUnit.MILLISECONDS.convert(g.getLong(fieldIndex, index), TimeUnit.MICROSECONDS);
+          case TIMESTAMP_MILLIS:
+            return g.getLong(fieldIndex, index);
+          case INTERVAL:
+          /*
+          INTERVAL is used for an interval of time. It must annotate a fixed_len_byte_array of length 12.
+          This array stores three little-endian unsigned integers that represent durations at different
+          granularities of time. The first stores a number in months, the second stores a number in days,
+          and the third stores a number in milliseconds. This representation is independent of any particular
+          timezone or date.
+
+          Each component in this representation is independent of the others. For example, there is no
+          requirement that a large number of days should be expressed as a mix of months and days because there is
+          not a constant conversion from days to months.
+
+          The sort order used for INTERVAL is undefined. When writing data, no min/max statistics should be
+           saved for this type and if such non-compliant statistics are found during reading, they must be ignored.
+           */
+            Binary intervalVal = g.getBinary(fieldIndex, index);
+            IntBuffer intBuf = intervalVal.toByteBuffer().order(ByteOrder.LITTLE_ENDIAN).asIntBuffer();
+            int months = intBuf.get(0);
+            int days = intBuf.get(1);
+            int millis = intBuf.get(2);
+            StringBuilder periodBuilder = new StringBuilder("P");
+            if (months > 0) {
+              periodBuilder.append(months).append("M");
+            }
+            if (days > 0) {
+              periodBuilder.append(days).append("D");
+            }
+            if (periodBuilder.length() > 1) {
+              Period p = Period.parse(periodBuilder.toString());
+              Duration d = p.toStandardDuration().plus(millis);
+              return d;
+            } else {
+              return new Duration(millis);
+            }
+          case INT_8:
+          case INT_16:
+          case INT_32:
+            return g.getInteger(fieldIndex, index);
+          case INT_64:
+            return g.getLong(fieldIndex, index);
+          // todo: idk wtd about unsigned
+          case UINT_8:
+          case UINT_16:
+          case UINT_32:
+            return g.getInteger(fieldIndex, index);
+          case UINT_64:
+            return g.getLong(fieldIndex, index);
+          case DECIMAL:
+          /*
+            DECIMAL can be used to annotate the following types:
+              int32: for 1 <= precision <= 9
+              int64: for 1 <= precision <= 18; precision < 10 will produce a warning
+              fixed_len_byte_array: precision is limited by the array size. Length n can
+                store <= floor(log_10(2^(8*n - 1) - 1)) base-10 digits
+              binary: precision is not limited, but is required. The minimum number of bytes to store
+                the unscaled value should be used.
+           */
+            int precision = pt.asPrimitiveType().getDecimalMetadata().getPrecision();
+            int scale = pt.asPrimitiveType().getDecimalMetadata().getScale();
+            switch (pt.getPrimitiveTypeName()) {
+              case INT32:
+                return new BigDecimal(g.getInteger(fieldIndex, index));
+              case INT64:
+                return new BigDecimal(g.getLong(fieldIndex, index));
+              case FIXED_LEN_BYTE_ARRAY:
+              case BINARY:
+                Binary value = g.getBinary(fieldIndex, index);
+                return convertBinaryToDecimal(value, precision, scale);
+              default:
+                throw new RE(
+                    "Unknown 'DECIMAL' type supplied to primitive conversion: %s (this should never happen)",
+                    pt.getPrimitiveTypeName()
+                );
+            }
+          case UTF8:
+          case ENUM:
+          case JSON:
+            return g.getString(fieldIndex, index);
+          case LIST:
+          case MAP:
+          case MAP_KEY_VALUE:
+          case BSON:
+          default:
+            throw new RE(
+                "Non-primitive supplied to primitive conversion: %s (this should never happen)",
+                ot.name()
+            );
+        }
+      } else {
+        // fallback to handling the raw primitive type if no logical type mapping
+        switch (pt.getPrimitiveTypeName()) {
+          case BOOLEAN:
+            return g.getBoolean(fieldIndex, index);
+          case INT32:
+            return g.getInteger(fieldIndex, index);
+          case INT64:
+            return g.getLong(fieldIndex, index);
+          case FLOAT:
+            return g.getFloat(fieldIndex, index);
+          case DOUBLE:
+            return g.getDouble(fieldIndex, index);
+          case INT96:
+            Binary tsBin = g.getInt96(fieldIndex, index);
+            return convertInt96BinaryToTimestamp(tsBin);
+          case FIXED_LEN_BYTE_ARRAY:
+          case BINARY:
+            Binary bin = g.getBinary(fieldIndex, index);
+            byte[] bytes = bin.getBytes();
+            if (binaryAsString) {
+              return StringUtils.fromUtf8(bytes);
+            } else {
+              return bytes;
+            }
+          default:
+            throw new RE("Unknown primitive conversion: %s", ot.name());
+        }
+      }
+    }
+    catch (Exception ex) {
+      return null;
+    }
+  }
+
+  /**
+   * convert deprecated parquet int96 nanosecond timestamp to a long, based on
+   * https://github.com/prestodb/presto/blob/master/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetTimestampUtils.java#L56
+   */
+  private static long convertInt96BinaryToTimestamp(Binary value)
+  {
+    // based on prestodb parquet int96 timestamp conversion
+    byte[] bytes = value.getBytes();
+
+    // little endian encoding - need to invert byte order
+    long timeOfDayNanos =
+        Longs.fromBytes(bytes[7], bytes[6], bytes[5], bytes[4], bytes[3], bytes[2], bytes[1], bytes[0]);
+    int julianDay = Ints.fromBytes(bytes[11], bytes[10], bytes[9], bytes[8]);
+
+    long ts = ((julianDay - JULIAN_EPOCH_OFFSET_DAYS) * MILLIS_IN_DAY) + (timeOfDayNanos / NANOS_PER_MILLISECOND);
+    return ts;
+  }
+
+  /**
+   * convert parquet binary decimal to BigDecimal, lifted from
+   * https://github.com/apache/parquet-mr/blob/master/parquet-pig/src/main/java/org/apache/parquet/pig/convert/DecimalUtils.java#L38
+   */
+  private static BigDecimal convertBinaryToDecimal(Binary value, int precision, int scale)
+  {
+    // based on parquet-mr pig conversion which is based on spark conversion... yo dawg?
+    if (precision <= 18) {
+      ByteBuffer buffer = value.toByteBuffer();
+      byte[] bytes = buffer.array();
+      int start = buffer.arrayOffset() + buffer.position();
+      int end = buffer.arrayOffset() + buffer.limit();
+      long unscaled = 0L;
+      int i = start;
+      while (i < end) {
+        unscaled = (unscaled << 8 | bytes[i] & 0xff);
+        i++;
+      }
+      int bits = 8 * (end - start);
+      long unscaledNew = (unscaled << (64 - bits)) >> (64 - bits);
+      if (unscaledNew <= -Math.pow(10, 18) || unscaledNew >= Math.pow(10, 18)) {
+        return new BigDecimal(unscaledNew);
+      } else {
+        return BigDecimal.valueOf(unscaledNew / Math.pow(10, scale));
+      }
+    } else {
+      return new BigDecimal(new BigInteger(value.getBytes()), scale);
+    }
+  }
+
+
+  static boolean isWrappedListPrimitive(Object o)
+  {
+    if (o instanceof Group) {
+      Group g = (Group) o;
+      return g.getType().isRepetition(Type.Repetition.REPEATED) &&
+             !g.getType().isPrimitive() &&
+             g.getType().asGroupType().getFieldCount() == 1 &&
+             g.getType().getFields().get(0).isPrimitive();
+    }
+    return false;
+  }
+
+  private boolean binaryAsString;
+
+  ParquetGroupConverter(boolean binaryAsString)
+  {
+    this.binaryAsString = binaryAsString;
+  }
+
+  /**
+   * Convert a parquet group field as though it were a map. Logical types of 'list' and 'map' will be transformed
+   * into java lists and maps respectively ({@link ParquetGroupConverter#convertLogicalList} and
+   * {@link ParquetGroupConverter#convertLogicalMap}), repeated fields will also be translated to lists, and
+   * primitive types will be extracted into an ingestion friendly state (e.g. 'int' and 'long'). Finally,
+   * if a field is not present, this method will return null.
+   */
+  @Nullable
+  Object convertField(Group g, String fieldName)
+  {
+    return convertField(g, fieldName, binaryAsString);
+  }
+
+  /**
+   * Properly formed parquet lists when passed through {@link ParquetGroupConverter#convertField(Group, String)} can
+   * return lists which contain 'wrapped' primitives, that are a {@link Group} with a single, primitive field (see
+   * {@link ParquetGroupConverter#isWrappedListPrimitive(Object)})
+   */
+  Object unwrapListPrimitive(Object o)
+  {
+    assert isWrappedListPrimitive(o);
+    Group g = (Group) o;
+    return convertPrimitiveField(g, 0, binaryAsString);
+  }
+}
diff --git a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/simple/ParquetGroupFlattenerMaker.java b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/simple/ParquetGroupFlattenerMaker.java
new file mode 100644
index 00000000000..cd2612ebd01
--- /dev/null
+++ b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/simple/ParquetGroupFlattenerMaker.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.parquet.simple;
+
+import com.jayway.jsonpath.Configuration;
+import com.jayway.jsonpath.JsonPath;
+import com.jayway.jsonpath.Option;
+import org.apache.druid.java.util.common.parsers.NotImplementedMappingProvider;
+import org.apache.druid.java.util.common.parsers.ObjectFlatteners;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.schema.Type;
+
+import javax.annotation.Nullable;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public class ParquetGroupFlattenerMaker implements ObjectFlatteners.FlattenerMaker<Group>
+{
+
+  private final Configuration jsonPathConfiguration;
+  private final ParquetGroupConverter converter;
+
+  ParquetGroupFlattenerMaker(boolean binaryAsString)
+  {
+    this.converter = new ParquetGroupConverter(binaryAsString);
+    this.jsonPathConfiguration = Configuration.builder()
+                                              .jsonProvider(new ParquetGroupJsonProvider(converter))
+                                              .mappingProvider(new NotImplementedMappingProvider())
+                                              .options(EnumSet.of(Option.SUPPRESS_EXCEPTIONS))
+                                              .build();
+  }
+
+  @Override
+  public Set<String> discoverRootFields(Group obj)
+  {
+    return obj.getType()
+              .getFields()
+              .stream()
+              .filter(Type::isPrimitive)
+              .map(Type::getName)
+              .collect(Collectors.toSet());
+  }
+
+  @Override
+  public Object getRootField(Group obj, String key)
+  {
+    Object val = converter.convertField(obj, key);
+    return finalizeConversion(val);
+  }
+
+  @Override
+  public Function<Group, Object> makeJsonPathExtractor(String expr)
+  {
+    final JsonPath jsonPath = JsonPath.compile(expr);
+    return record -> {
+      Object val = jsonPath.read(record, jsonPathConfiguration);
+      return finalizeConversion(val);
+    };
+  }
+
+  @Nullable
+  @Override
+  public Function<Group, Object> makeJsonQueryExtractor(String expr)
+  {
+    throw new UnsupportedOperationException("Parquet does not support JQ");
+  }
+
+  /**
+   * After json conversion, wrapped list items can still need unwrapped. See
+   * {@link ParquetGroupConverter#isWrappedListPrimitive(Object)} and
+   * {@link ParquetGroupConverter#unwrapListPrimitive(Object)} for more details.
+   *
+   * @param o
+   *
+   * @return
+   */
+  private Object finalizeConversion(Object o)
+  {
+    // conversion can leave 'wrapped' list primitives
+    if (ParquetGroupConverter.isWrappedListPrimitive(o)) {
+      return converter.unwrapListPrimitive(o);
+    } else if (o instanceof List) {
+      List<Object> asList = (List<Object>) o;
+      if (asList.stream().allMatch(ParquetGroupConverter::isWrappedListPrimitive)) {
+        return asList.stream().map(Group.class::cast).map(converter::unwrapListPrimitive).collect(Collectors.toList());
+      }
+    }
+    return o;
+  }
+}
diff --git a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/simple/ParquetGroupJsonProvider.java b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/simple/ParquetGroupJsonProvider.java
new file mode 100644
index 00000000000..78e3f3355f7
--- /dev/null
+++ b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/simple/ParquetGroupJsonProvider.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.parquet.simple;
+
+import com.jayway.jsonpath.InvalidJsonException;
+import com.jayway.jsonpath.spi.json.JsonProvider;
+import org.apache.parquet.example.data.Group;
+
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Provides json path for Parquet {@link Group} objects
+ */
+public class ParquetGroupJsonProvider implements JsonProvider
+{
+  private final ParquetGroupConverter converter;
+
+  ParquetGroupJsonProvider(ParquetGroupConverter converter)
+  {
+    this.converter = converter;
+  }
+
+  @Override
+  public Object createArray()
+  {
+    return new ArrayList<>();
+  }
+
+  @Override
+  public Object createMap()
+  {
+    return new HashMap<>();
+  }
+
+  @Override
+  public boolean isArray(final Object o)
+  {
+    return o instanceof List;
+  }
+
+  @Override
+  public boolean isMap(final Object o)
+  {
+    return o instanceof Map || o instanceof Group;
+  }
+
+  @Override
+  public int length(final Object o)
+  {
+    if (o instanceof List) {
+      return ((List) o).size();
+    } else if (o instanceof Group) {
+      // both lists and maps are 'Group' type, but we should only have a group here in a map context
+      Group g = (Group) o;
+      return g.getType().getFields().size();
+    } else {
+      return 0;
+    }
+  }
+
+  @Override
+  public Iterable<?> toIterable(final Object o)
+  {
+    if (o instanceof List) {
+      return (List) o;
+    }
+    throw new UnsupportedOperationException(o.getClass().getName());
+  }
+
+  @Override
+  public Collection<String> getPropertyKeys(final Object o)
+  {
+    if (o instanceof Map) {
+      return ((Map<Object, Object>) o).keySet().stream().map(String::valueOf).collect(Collectors.toSet());
+    } else if (o instanceof Group) {
+      return ((Group) o).getType().getFields().stream().map(f -> f.getName()).collect(Collectors.toSet());
+    } else {
+      throw new UnsupportedOperationException(o.getClass().getName());
+    }
+  }
+
+  @Override
+  public Object getMapValue(final Object o, final String s)
+  {
+    if (o instanceof Map) {
+      return ((Map) o).get(s);
+    } else if (o instanceof Group) {
+      Group g = (Group) o;
+      return converter.convertField(g, s);
+    }
+    throw new UnsupportedOperationException(o.getClass().getName());
+  }
+
+  @Override
+  public Object getArrayIndex(final Object o, final int i)
+  {
+    if (o instanceof List) {
+      return ((List) o).get(i);
+    }
+    throw new UnsupportedOperationException(o.getClass().getName());
+  }
+
+  @Override
+  public void setArrayIndex(final Object o, final int i, final Object o1)
+  {
+    if (o instanceof List) {
+      final List list = (List) o;
+      if (list.size() == i) {
+        list.add(o1);
+      } else {
+        list.set(i, o1);
+      }
+    } else {
+      throw new UnsupportedOperationException(o.getClass().getName());
+    }
+  }
+
+  @Override
+  public void setProperty(final Object o, final Object o1, final Object o2)
+  {
+    if (o instanceof Map) {
+      ((Map) o).put(o1, o2);
+    } else {
+      throw new UnsupportedOperationException(o.getClass().getName());
+    }
+  }
+
+  @Override
+  public void removeProperty(final Object o, final Object o1)
+  {
+    if (o instanceof Map) {
+      ((Map) o).remove(o1);
+    } else {
+      throw new UnsupportedOperationException(o.getClass().getName());
+    }
+  }
+
+  @Override
+  @Deprecated
+  public Object getArrayIndex(final Object o, final int i, final boolean b)
+  {
+    throw new UnsupportedOperationException("Deprecated");
+  }
+
+  @Override
+  public Object parse(final String s) throws InvalidJsonException
+  {
+    throw new UnsupportedOperationException("Unused");
+  }
+
+  @Override
+  public Object parse(final InputStream inputStream, final String s) throws InvalidJsonException
+  {
+    throw new UnsupportedOperationException("Unused");
+  }
+
+  @Override
+  public String toJson(final Object o)
+  {
+    throw new UnsupportedOperationException("Unused");
+  }
+
+  @Override
+  public Object unwrap(final Object o)
+  {
+    throw new UnsupportedOperationException("Unused");
+  }
+}
+
diff --git a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/simple/ParquetHadoopInputRowParser.java b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/simple/ParquetHadoopInputRowParser.java
new file mode 100644
index 00000000000..762bb709d1c
--- /dev/null
+++ b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/simple/ParquetHadoopInputRowParser.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.parquet.simple;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.impl.InputRowParser;
+import org.apache.druid.data.input.impl.MapInputRowParser;
+import org.apache.druid.data.input.impl.ParseSpec;
+import org.apache.druid.java.util.common.parsers.JSONPathSpec;
+import org.apache.druid.java.util.common.parsers.ObjectFlattener;
+import org.apache.druid.java.util.common.parsers.ObjectFlatteners;
+import org.apache.parquet.example.data.Group;
+
+import java.util.List;
+
+public class ParquetHadoopInputRowParser implements InputRowParser<Group>
+{
+  private final ParseSpec parseSpec;
+  private final boolean binaryAsString;
+  private final ObjectFlattener<Group> groupFlattener;
+  private final MapInputRowParser parser;
+
+  @JsonCreator
+  public ParquetHadoopInputRowParser(
+      @JsonProperty("parseSpec") ParseSpec parseSpec,
+      @JsonProperty("binaryAsString") Boolean binaryAsString
+  )
+  {
+    this.parseSpec = parseSpec;
+    this.binaryAsString = binaryAsString == null ? false : binaryAsString;
+
+    final JSONPathSpec flattenSpec;
+    if ((parseSpec instanceof ParquetParseSpec)) {
+      flattenSpec = ((ParquetParseSpec) parseSpec).getFlattenSpec();
+    } else {
+      flattenSpec = JSONPathSpec.DEFAULT;
+    }
+    this.groupFlattener = ObjectFlatteners.create(flattenSpec, new ParquetGroupFlattenerMaker(this.binaryAsString));
+    this.parser = new MapInputRowParser(parseSpec);
+  }
+
+  @Override
+  public ParseSpec getParseSpec()
+  {
+    return parseSpec;
+  }
+
+  @Override
+  public InputRowParser withParseSpec(ParseSpec parseSpec)
+  {
+    return new ParquetHadoopInputRowParser(parseSpec, binaryAsString);
+  }
+
+  @Override
+  public List<InputRow> parseBatch(Group group)
+  {
+    return parser.parseBatch(groupFlattener.flatten(group));
+  }
+}
diff --git a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/simple/ParquetParseSpec.java b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/simple/ParquetParseSpec.java
new file mode 100644
index 00000000000..63e51779746
--- /dev/null
+++ b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/simple/ParquetParseSpec.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.parquet.simple;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.NestedDataParseSpec;
+import org.apache.druid.data.input.impl.ParseSpec;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.java.util.common.parsers.JSONPathSpec;
+
+public class ParquetParseSpec extends NestedDataParseSpec<JSONPathSpec>
+{
+  @JsonCreator
+  public ParquetParseSpec(
+      @JsonProperty("timestampSpec") TimestampSpec timestampSpec,
+      @JsonProperty("dimensionsSpec") DimensionsSpec dimensionsSpec,
+      @JsonProperty("flattenSpec") JSONPathSpec flattenSpec
+  )
+  {
+    super(
+        timestampSpec,
+        dimensionsSpec != null ? dimensionsSpec : DimensionsSpec.EMPTY,
+        flattenSpec != null ? flattenSpec : JSONPathSpec.DEFAULT
+    );
+  }
+
+  @Override
+  public ParseSpec withTimestampSpec(TimestampSpec spec)
+  {
+    return new ParquetParseSpec(spec, getDimensionsSpec(), getFlattenSpec());
+  }
+
+  @Override
+  public ParseSpec withDimensionsSpec(DimensionsSpec spec)
+  {
+    return new ParquetParseSpec(getTimestampSpec(), spec, getFlattenSpec());
+  }
+
+  @Override
+  public String toString()
+  {
+    return "ParquetGroupParseSpec{" +
+           "timestampSpec=" + getTimestampSpec() +
+           ", dimensionsSpec=" + getDimensionsSpec() +
+           ", flattenSpec=" + getFlattenSpec() +
+           "}";
+  }
+}
diff --git a/extensions-contrib/parquet-extensions/src/main/java/org/apache/parquet/avro/DruidParquetReadSupport.java b/extensions-core/parquet-extensions/src/main/java/org/apache/parquet/avro/DruidParquetAvroReadSupport.java
similarity index 82%
rename from extensions-contrib/parquet-extensions/src/main/java/org/apache/parquet/avro/DruidParquetReadSupport.java
rename to extensions-core/parquet-extensions/src/main/java/org/apache/parquet/avro/DruidParquetAvroReadSupport.java
index 954c31cc0da..e8ffde7800b 100755
--- a/extensions-contrib/parquet-extensions/src/main/java/org/apache/parquet/avro/DruidParquetReadSupport.java
+++ b/extensions-core/parquet-extensions/src/main/java/org/apache/parquet/avro/DruidParquetAvroReadSupport.java
@@ -23,7 +23,9 @@
 
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
+import org.apache.druid.data.input.avro.AvroParseSpec;
 import org.apache.druid.data.input.impl.DimensionSchema;
+import org.apache.druid.data.input.impl.ParseSpec;
 import org.apache.druid.indexer.HadoopDruidIndexerConfig;
 import org.apache.druid.query.aggregation.AggregatorFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -42,7 +44,7 @@
 /**
  * This class must in package org.apache.parquet.avro to access the AvroRecordMaterializer constructor
  */
-public class DruidParquetReadSupport extends AvroReadSupport<GenericRecord>
+public class DruidParquetAvroReadSupport extends AvroReadSupport<GenericRecord>
 {
 
   /**
@@ -59,6 +61,14 @@ private MessageType getPartialReadSchema(InitContext context)
     String name = fullSchema.getName();
 
     HadoopDruidIndexerConfig config = HadoopDruidIndexerConfig.fromConfiguration(context.getConfiguration());
+    ParseSpec parseSpec = config.getParser().getParseSpec();
+
+    if (parseSpec instanceof AvroParseSpec) {
+      if (((AvroParseSpec) parseSpec).getFlattenSpec() != null) {
+        return fullSchema;
+      }
+    }
+
     String tsField = config.getParser().getParseSpec().getTimestampSpec().getTimestampColumn();
 
     List<DimensionSchema> dimensionSchema = config.getParser().getParseSpec().getDimensionsSpec().getDimensions();
@@ -101,6 +111,13 @@ public ReadContext init(InitContext context)
       ReadContext readContext
   )
   {
+    // coercing this value to false by default here to be friendlier default behavior
+    // see https://github.com/apache/incubator-druid/issues/5433#issuecomment-388539306
+    String jobProp = "parquet.avro.add-list-element-records";
+    Boolean explicitlySet = configuration.getBoolean(jobProp, false);
+    if (!explicitlySet) {
+      configuration.setBoolean(jobProp, false);
+    }
     MessageType parquetSchema = readContext.getRequestedSchema();
     Schema avroSchema = new AvroSchemaConverter(configuration).convert(parquetSchema);
 
@@ -110,7 +127,6 @@ public ReadContext init(InitContext context)
         AvroDataSupplier.class
     );
     AvroDataSupplier supplier = ReflectionUtils.newInstance(suppClass, configuration);
-    return new AvroRecordMaterializer<GenericRecord>(parquetSchema, avroSchema, supplier.get());
+    return new AvroRecordMaterializer<>(parquetSchema, avroSchema, supplier.get());
   }
-
 }
diff --git a/extensions-contrib/parquet-extensions/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule b/extensions-core/parquet-extensions/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule
similarity index 100%
rename from extensions-contrib/parquet-extensions/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule
rename to extensions-core/parquet-extensions/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule
diff --git a/extensions-contrib/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/DruidParquetInputTest.java b/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/BaseParquetInputTest.java
similarity index 51%
rename from extensions-contrib/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/DruidParquetInputTest.java
rename to extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/BaseParquetInputTest.java
index 24346275b80..735f6d30ef2 100644
--- a/extensions-contrib/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/DruidParquetInputTest.java
+++ b/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/BaseParquetInputTest.java
@@ -16,16 +16,21 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.druid.data.input.parquet;
 
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.util.Utf8;
+import avro.shaded.com.google.common.collect.ImmutableMap;
+import org.apache.directory.api.util.Strings;
 import org.apache.druid.data.input.InputRow;
 import org.apache.druid.data.input.impl.InputRowParser;
+import org.apache.druid.data.input.parquet.avro.DruidParquetAvroInputFormat;
+import org.apache.druid.data.input.parquet.simple.DruidParquetInputFormat;
 import org.apache.druid.indexer.HadoopDruidIndexerConfig;
 import org.apache.druid.indexer.path.StaticPathSpec;
+import org.apache.druid.java.util.common.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
@@ -33,73 +38,64 @@
 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
 import org.apache.hadoop.util.ReflectionUtils;
-import org.junit.Test;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 
-import static org.junit.Assert.assertEquals;
-
-public class DruidParquetInputTest
+class BaseParquetInputTest
 {
-  @Test
-  public void testReadParquetFile() throws IOException, InterruptedException
-  {
-    HadoopDruidIndexerConfig config = HadoopDruidIndexerConfig.fromFile(new File(
-        "example/wikipedia_hadoop_parquet_job.json")
-    );
-    Job job = Job.getInstance(new Configuration());
-    config.intoConfiguration(job);
-    GenericRecord data = getFirstRecord(job, ((StaticPathSpec) config.getPathSpec()).getPaths());
-
-    // field not read, should return null
-    assertEquals(data.get("added"), null);
-    assertEquals(data.get("page"), new Utf8("Gypsy Danger"));
-    assertEquals(
-        ((List<InputRow>) config.getParser().parseBatch(data)).get(0).getDimension("page").get(0),
-        "Gypsy Danger"
-    );
-  }
-
-  @Test
-  public void testBinaryAsString() throws IOException, InterruptedException
+  private static Map<String, String> parseSpecType = ImmutableMap.of(
+      ParquetExtensionsModule.PARQUET_AVRO_INPUT_PARSER_TYPE,
+      ParquetExtensionsModule.PARQUET_AVRO_PARSE_SPEC_TYPE,
+      ParquetExtensionsModule.PARQUET_SIMPLE_INPUT_PARSER_TYPE,
+      ParquetExtensionsModule.PARQUET_SIMPLE_PARSE_SPEC_TYPE
+  );
+
+  private static Map<String, String> inputFormatType = ImmutableMap.of(
+      ParquetExtensionsModule.PARQUET_AVRO_INPUT_PARSER_TYPE,
+      "org.apache.druid.data.input.parquet.avro.DruidParquetAvroInputFormat",
+      ParquetExtensionsModule.PARQUET_SIMPLE_INPUT_PARSER_TYPE,
+      "org.apache.druid.data.input.parquet.simple.DruidParquetInputFormat"
+  );
+
+  private static Map<String, Class<? extends InputFormat>> inputFormatClass = ImmutableMap.of(
+      ParquetExtensionsModule.PARQUET_AVRO_INPUT_PARSER_TYPE,
+      DruidParquetAvroInputFormat.class,
+      ParquetExtensionsModule.PARQUET_SIMPLE_INPUT_PARSER_TYPE,
+      DruidParquetInputFormat.class
+  );
+
+  static HadoopDruidIndexerConfig transformHadoopDruidIndexerConfig(
+      String templateFile,
+      String type,
+      boolean withParseType
+  )
+      throws IOException
   {
-    HadoopDruidIndexerConfig config = HadoopDruidIndexerConfig.fromFile(new File(
-        "example/impala_hadoop_parquet_job.json")
-    );
-    Job job = Job.getInstance(new Configuration());
-    config.intoConfiguration(job);
-    GenericRecord data = getFirstRecord(job, ((StaticPathSpec) config.getPathSpec()).getPaths());
-
-    InputRow row = ((List<InputRow>) config.getParser().parseBatch(data)).get(0);
-
-    // without binaryAsString: true, the value would something like "[104, 101, 121, 32, 116, 104, 105, 115, 32, 105, 115, 3.... ]"
-    assertEquals(row.getDimension("field").get(0), "hey this is &é(-è_çà)=^$ù*! Ω^^");
-    assertEquals(row.getTimestampFromEpoch(), 1471800234);
-  }
-
-  @Test
-  public void testDateHandling() throws IOException, InterruptedException
-  {
-    List<InputRow> rowsWithString = getAllRows("example/date_test_data_job_string.json");
-    List<InputRow> rowsWithDate = getAllRows("example/date_test_data_job_date.json");
-    assertEquals(rowsWithDate.size(), rowsWithString.size());
-
-    for (int i = 0; i < rowsWithDate.size(); i++) {
-      assertEquals(rowsWithString.get(i).getTimestamp(), rowsWithDate.get(i).getTimestamp());
+    String template = Strings.utf8ToString(Files.readAllBytes(Paths.get(templateFile)));
+    String transformed;
+    if (withParseType) {
+      transformed = StringUtils.format(template, inputFormatType.get(type), type, parseSpecType.get(type));
+    } else {
+      transformed = StringUtils.format(template, inputFormatType.get(type), type);
     }
+    return HadoopDruidIndexerConfig.fromString(transformed);
   }
 
-  private GenericRecord getFirstRecord(Job job, String parquetPath) throws IOException, InterruptedException
+
+  static Object getFirstRow(Job job, String parserType, String parquetPath) throws IOException, InterruptedException
   {
     File testFile = new File(parquetPath);
     Path path = new Path(testFile.getAbsoluteFile().toURI());
     FileSplit split = new FileSplit(path, 0, testFile.length(), null);
 
-    DruidParquetInputFormat inputFormat = ReflectionUtils.newInstance(
-        DruidParquetInputFormat.class,
+    InputFormat inputFormat = ReflectionUtils.newInstance(
+        inputFormatClass.get(parserType),
         job.getConfiguration()
     );
     TaskAttemptContext context = new TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID());
@@ -108,13 +104,13 @@ private GenericRecord getFirstRecord(Job job, String parquetPath) throws IOExcep
 
       reader.initialize(split, context);
       reader.nextKeyValue();
-      return (GenericRecord) reader.getCurrentValue();
+      return reader.getCurrentValue();
     }
   }
 
-  private List<InputRow> getAllRows(String configPath) throws IOException, InterruptedException
+  static List<InputRow> getAllRows(String parserType, HadoopDruidIndexerConfig config)
+      throws IOException, InterruptedException
   {
-    HadoopDruidIndexerConfig config = HadoopDruidIndexerConfig.fromFile(new File(configPath));
     Job job = Job.getInstance(new Configuration());
     config.intoConfiguration(job);
 
@@ -122,8 +118,8 @@ private GenericRecord getFirstRecord(Job job, String parquetPath) throws IOExcep
     Path path = new Path(testFile.getAbsoluteFile().toURI());
     FileSplit split = new FileSplit(path, 0, testFile.length(), null);
 
-    DruidParquetInputFormat inputFormat = ReflectionUtils.newInstance(
-        DruidParquetInputFormat.class,
+    InputFormat inputFormat = ReflectionUtils.newInstance(
+        inputFormatClass.get(parserType),
         job.getConfiguration()
     );
     TaskAttemptContext context = new TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID());
@@ -135,7 +131,7 @@ private GenericRecord getFirstRecord(Job job, String parquetPath) throws IOExcep
       reader.initialize(split, context);
       while (reader.nextKeyValue()) {
         reader.nextKeyValue();
-        GenericRecord data = (GenericRecord) reader.getCurrentValue();
+        Object data = reader.getCurrentValue();
         records.add(((List<InputRow>) parser.parseBatch(data)).get(0));
       }
 
diff --git a/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/CompatParquetInputTest.java b/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/CompatParquetInputTest.java
new file mode 100644
index 00000000000..5addaecf37e
--- /dev/null
+++ b/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/CompatParquetInputTest.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.parquet;
+
+import avro.shaded.com.google.common.collect.ImmutableList;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.indexer.HadoopDruidIndexerConfig;
+import org.apache.druid.indexer.path.StaticPathSpec;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+@RunWith(Parameterized.class)
+public class CompatParquetInputTest extends BaseParquetInputTest
+{
+  @Parameterized.Parameters(name = "type = {0}")
+  public static Iterable<Object[]> constructorFeeder()
+  {
+    return ImmutableList.of(
+        new Object[]{ParquetExtensionsModule.PARQUET_AVRO_INPUT_PARSER_TYPE},
+        new Object[]{ParquetExtensionsModule.PARQUET_SIMPLE_INPUT_PARSER_TYPE}
+    );
+  }
+
+  private final String parserType;
+  private final Job job;
+
+  public CompatParquetInputTest(String parserType) throws IOException
+  {
+    this.parserType = parserType;
+    this.job = Job.getInstance(new Configuration());
+  }
+
+  @Test
+  public void testBinaryAsString() throws IOException, InterruptedException
+  {
+    HadoopDruidIndexerConfig config = transformHadoopDruidIndexerConfig(
+        "example/compat/impala_hadoop_parquet_job.json",
+        parserType,
+        false
+    );
+    config.intoConfiguration(job);
+    Object data = getFirstRow(job, parserType, ((StaticPathSpec) config.getPathSpec()).getPaths());
+
+    InputRow row = ((List<InputRow>) config.getParser().parseBatch(data)).get(0);
+
+    // without binaryAsString: true, the value would something like "[104, 101, 121, 32, 116, 104, 105, 115, 32, 105, 115, 3.... ]"
+    assertEquals(row.getDimension("field").get(0), "hey this is &é(-è_çà)=^$ù*! Ω^^");
+    assertEquals(row.getTimestampFromEpoch(), 1471800234);
+  }
+
+
+  @Test
+  public void testParquet1217() throws IOException, InterruptedException
+  {
+    HadoopDruidIndexerConfig config = transformHadoopDruidIndexerConfig(
+        "example/compat/parquet_1217.json",
+        parserType,
+        true
+    );
+    config.intoConfiguration(job);
+    Object data = getFirstRow(job, parserType, ((StaticPathSpec) config.getPathSpec()).getPaths());
+    List<InputRow> rows = (List<InputRow>) config.getParser().parseBatch(data);
+    List<InputRow> rows2 = getAllRows(parserType, config);
+    assertEquals("2018-09-01T00:00:00.000Z", rows.get(0).getTimestamp().toString());
+    assertEquals("-1", rows.get(0).getDimension("col").get(0));
+    assertEquals(-1, rows.get(0).getMetric("metric1"));
+    assertTrue(rows2.get(2).getDimension("col").isEmpty());
+  }
+
+  @Test
+  public void testParquetThriftCompat() throws IOException, InterruptedException
+  {
+    // parquet-avro does not support this conversion:
+    // Map key type must be binary (UTF8): required int32 key
+    if (parserType.equals(ParquetExtensionsModule.PARQUET_AVRO_INPUT_PARSER_TYPE)) {
+      return;
+    }
+    /*
+      message ParquetSchema {
+        required boolean boolColumn;
+        required int32 byteColumn;
+        required int32 shortColumn;
+        required int32 intColumn;
+        required int64 longColumn;
+        required double doubleColumn;
+        required binary binaryColumn (UTF8);
+        required binary stringColumn (UTF8);
+        required binary enumColumn (ENUM);
+        optional boolean maybeBoolColumn;
+        optional int32 maybeByteColumn;
+        optional int32 maybeShortColumn;
+        optional int32 maybeIntColumn;
+        optional int64 maybeLongColumn;
+        optional double maybeDoubleColumn;
+        optional binary maybeBinaryColumn (UTF8);
+        optional binary maybeStringColumn (UTF8);
+        optional binary maybeEnumColumn (ENUM);
+        required group stringsColumn (LIST) {
+          repeated binary stringsColumn_tuple (UTF8);
+        }
+        required group intSetColumn (LIST) {
+          repeated int32 intSetColumn_tuple;
+        }
+        required group intToStringColumn (MAP) {
+          repeated group map (MAP_KEY_VALUE) {
+            required int32 key;
+            optional binary value (UTF8);
+          }
+        }
+        required group complexColumn (MAP) {
+          repeated group map (MAP_KEY_VALUE) {
+            required int32 key;
+            optional group value (LIST) {
+              repeated group value_tuple {
+                required group nestedIntsColumn (LIST) {
+                  repeated int32 nestedIntsColumn_tuple;
+                }
+                required binary nestedStringColumn (UTF8);
+              }
+            }
+          }
+        }
+      }
+     */
+
+    HadoopDruidIndexerConfig config = transformHadoopDruidIndexerConfig(
+        "example/compat/parquet_thrift_compat.json",
+        parserType,
+        true
+    );
+
+    config.intoConfiguration(job);
+    Object data = getFirstRow(job, parserType, ((StaticPathSpec) config.getPathSpec()).getPaths());
+    List<InputRow> rows = (List<InputRow>) config.getParser().parseBatch(data);
+    assertEquals("2018-09-01T00:00:00.000Z", rows.get(0).getTimestamp().toString());
+    assertEquals("true", rows.get(0).getDimension("boolColumn").get(0));
+    assertEquals("0", rows.get(0).getDimension("byteColumn").get(0));
+    assertEquals("1", rows.get(0).getDimension("shortColumn").get(0));
+    assertEquals("2", rows.get(0).getDimension("intColumn").get(0));
+    assertEquals("0", rows.get(0).getDimension("longColumn").get(0));
+    assertEquals("0.2", rows.get(0).getDimension("doubleColumn").get(0));
+    assertEquals("val_0", rows.get(0).getDimension("binaryColumn").get(0));
+    assertEquals("val_0", rows.get(0).getDimension("stringColumn").get(0));
+    assertEquals("SPADES", rows.get(0).getDimension("enumColumn").get(0));
+    assertTrue(rows.get(0).getDimension("maybeBoolColumn").isEmpty());
+    assertTrue(rows.get(0).getDimension("maybeByteColumn").isEmpty());
+    assertTrue(rows.get(0).getDimension("maybeShortColumn").isEmpty());
+    assertTrue(rows.get(0).getDimension("maybeIntColumn").isEmpty());
+    assertTrue(rows.get(0).getDimension("maybeLongColumn").isEmpty());
+    assertTrue(rows.get(0).getDimension("maybeDoubleColumn").isEmpty());
+    assertTrue(rows.get(0).getDimension("maybeBinaryColumn").isEmpty());
+    assertTrue(rows.get(0).getDimension("maybeStringColumn").isEmpty());
+    assertTrue(rows.get(0).getDimension("maybeEnumColumn").isEmpty());
+    assertEquals("arr_0", rows.get(0).getDimension("stringsColumn").get(0));
+    assertEquals("arr_1", rows.get(0).getDimension("stringsColumn").get(1));
+    assertEquals("0", rows.get(0).getDimension("intSetColumn").get(0));
+    assertEquals("val_1", rows.get(0).getDimension("extractByLogicalMap").get(0));
+    assertEquals("1", rows.get(0).getDimension("extractByComplexLogicalMap").get(0));
+  }
+
+  @Test
+  public void testOldRepeatedInt() throws IOException, InterruptedException
+  {
+    // parquet-avro does not support this conversion:
+    // REPEATED not supported outside LIST or MAP. Type: repeated int32 repeatedInt
+    if (parserType.equals(ParquetExtensionsModule.PARQUET_AVRO_INPUT_PARSER_TYPE)) {
+      return;
+    }
+    HadoopDruidIndexerConfig config = transformHadoopDruidIndexerConfig(
+        "example/compat/old_repeated_int.json",
+        parserType,
+        true
+    );
+    config.intoConfiguration(job);
+    List<InputRow> rows = getAllRows(parserType, config);
+    assertEquals("2018-09-01T00:00:00.000Z", rows.get(0).getTimestamp().toString());
+    assertEquals("1", rows.get(0).getDimension("repeatedInt").get(0));
+    assertEquals("2", rows.get(0).getDimension("repeatedInt").get(1));
+    assertEquals("3", rows.get(0).getDimension("repeatedInt").get(2));
+  }
+
+  @Test
+  public void testReadNestedArrayStruct() throws IOException, InterruptedException
+  {
+    // parquet-avro does not support this conversion
+    // REPEATED not supported outside LIST or MAP. Type: repeated group repeatedMessage {
+    //  optional int32 someId;
+    // }
+    if (parserType.equals(ParquetExtensionsModule.PARQUET_AVRO_INPUT_PARSER_TYPE)) {
+      return;
+    }
+    HadoopDruidIndexerConfig config = transformHadoopDruidIndexerConfig(
+        "example/compat/nested_array_struct.json",
+        parserType,
+        true
+    );
+
+    config.intoConfiguration(job);
+    List<InputRow> rows = getAllRows(parserType, config);
+    assertEquals("2018-09-01T00:00:00.000Z", rows.get(0).getTimestamp().toString());
+    assertEquals("5", rows.get(0).getDimension("primitive").get(0));
+    assertEquals("4", rows.get(0).getDimension("extracted1").get(0));
+    assertEquals("6", rows.get(0).getDimension("extracted2").get(0));
+  }
+
+  @Test
+  public void testProtoStructWithArray() throws IOException, InterruptedException
+  {
+    // parquet-avro does not support this conversion:
+    // "REPEATED not supported outside LIST or MAP. Type: repeated int32 repeatedPrimitive"
+    if (parserType.equals(ParquetExtensionsModule.PARQUET_AVRO_INPUT_PARSER_TYPE)) {
+      return;
+    }
+
+    HadoopDruidIndexerConfig config = transformHadoopDruidIndexerConfig(
+        "example/compat/proto_struct_with_array.json",
+        parserType,
+        true
+    );
+    config.intoConfiguration(job);
+    List<InputRow> rows = getAllRows(parserType, config);
+    assertEquals("2018-09-01T00:00:00.000Z", rows.get(0).getTimestamp().toString());
+    assertEquals("10", rows.get(0).getDimension("optionalPrimitive").get(0));
+    assertEquals("9", rows.get(0).getDimension("requiredPrimitive").get(0));
+    assertTrue(rows.get(0).getDimension("repeatedPrimitive").isEmpty());
+    assertTrue(rows.get(0).getDimension("extractedOptional").isEmpty());
+    assertEquals("9", rows.get(0).getDimension("extractedRequired").get(0));
+    assertEquals("9", rows.get(0).getDimension("extractedRepeated").get(0));
+    assertEquals("10", rows.get(0).getDimension("extractedRepeated").get(1));
+  }
+}
diff --git a/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/DecimalParquetInputTest.java b/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/DecimalParquetInputTest.java
new file mode 100644
index 00000000000..2660d33bc07
--- /dev/null
+++ b/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/DecimalParquetInputTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.parquet;
+
+import avro.shaded.com.google.common.collect.ImmutableList;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.indexer.HadoopDruidIndexerConfig;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+@RunWith(Parameterized.class)
+public class DecimalParquetInputTest extends BaseParquetInputTest
+{
+  @Parameterized.Parameters(name = "type = {0}")
+  public static Iterable<Object[]> constructorFeeder()
+  {
+    return ImmutableList.of(
+        new Object[]{ParquetExtensionsModule.PARQUET_AVRO_INPUT_PARSER_TYPE},
+        new Object[]{ParquetExtensionsModule.PARQUET_SIMPLE_INPUT_PARSER_TYPE}
+    );
+  }
+
+  private final String parserType;
+  private final Job job;
+
+  public DecimalParquetInputTest(String parserType) throws IOException
+  {
+    this.parserType = parserType;
+    this.job = Job.getInstance(new Configuration());
+  }
+
+  @Test
+  public void testReadParquetDecimalFixedLen() throws IOException, InterruptedException
+  {
+    // parquet-avro does not correctly convert decimal types
+    if (parserType.equals(ParquetExtensionsModule.PARQUET_AVRO_INPUT_PARSER_TYPE)) {
+      return;
+    }
+    HadoopDruidIndexerConfig config = transformHadoopDruidIndexerConfig(
+        "example/decimals/dec_in_fix_len.json",
+        parserType,
+        true
+    );
+    List<InputRow> rows = getAllRows(parserType, config);
+    assertEquals("2018-09-01T00:00:00.000Z", rows.get(0).getTimestamp().toString());
+    assertEquals("1.0", rows.get(0).getDimension("fixed_len_dec").get(0));
+    assertEquals(new BigDecimal("1.0"), rows.get(0).getMetric("metric1"));
+  }
+
+  @Test
+  public void testReadParquetDecimali32() throws IOException, InterruptedException
+  {
+    // parquet-avro does not correctly convert decimal types
+    if (parserType.equals(ParquetExtensionsModule.PARQUET_AVRO_INPUT_PARSER_TYPE)) {
+      return;
+    }
+    HadoopDruidIndexerConfig config = transformHadoopDruidIndexerConfig(
+        "example/decimals/dec_in_i32.json",
+        parserType,
+        true
+    );
+    List<InputRow> rows = getAllRows(parserType, config);
+    assertEquals("2018-09-01T00:00:00.000Z", rows.get(0).getTimestamp().toString());
+    assertEquals("100", rows.get(0).getDimension("i32_dec").get(0));
+    assertEquals(new BigDecimal(100), rows.get(0).getMetric("metric1"));
+  }
+
+  @Test
+  public void testReadParquetDecimali64() throws IOException, InterruptedException
+  {
+    // parquet-avro does not correctly convert decimal types
+    if (parserType.equals(ParquetExtensionsModule.PARQUET_AVRO_INPUT_PARSER_TYPE)) {
+      return;
+    }
+    HadoopDruidIndexerConfig config = transformHadoopDruidIndexerConfig(
+        "example/decimals/dec_in_i64.json",
+        parserType,
+        true
+    );
+    List<InputRow> rows = getAllRows(parserType, config);
+    assertEquals("2018-09-01T00:00:00.000Z", rows.get(0).getTimestamp().toString());
+    assertEquals("100", rows.get(0).getDimension("i64_dec").get(0));
+    assertEquals(new BigDecimal(100), rows.get(0).getMetric("metric1"));
+  }
+}
diff --git a/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/FlattenSpecParquetInputTest.java b/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/FlattenSpecParquetInputTest.java
new file mode 100644
index 00000000000..ee3aa6dc112
--- /dev/null
+++ b/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/FlattenSpecParquetInputTest.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.parquet;
+
+import avro.shaded.com.google.common.collect.ImmutableList;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.indexer.HadoopDruidIndexerConfig;
+import org.apache.druid.indexer.path.StaticPathSpec;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+@RunWith(Parameterized.class)
+public class FlattenSpecParquetInputTest extends BaseParquetInputTest
+{
+  private static String TS1 = "2018-09-18T00:18:00.023Z";
+
+
+  @Parameterized.Parameters(name = "type = {0}")
+  public static Iterable<Object[]> constructorFeeder()
+  {
+    return ImmutableList.of(
+        new Object[]{ParquetExtensionsModule.PARQUET_AVRO_INPUT_PARSER_TYPE},
+        new Object[]{ParquetExtensionsModule.PARQUET_SIMPLE_INPUT_PARSER_TYPE}
+    );
+  }
+
+  private final String parserType;
+  private final Job job;
+
+  public FlattenSpecParquetInputTest(String parserType) throws IOException
+  {
+    this.parserType = parserType;
+    this.job = Job.getInstance(new Configuration());
+  }
+
+  @Test
+  public void testFlat1NoFlattenSpec() throws IOException, InterruptedException
+  {
+    HadoopDruidIndexerConfig config = transformHadoopDruidIndexerConfig(
+        "example/flattening/flat_1.json",
+        parserType,
+        false
+    );
+    config.intoConfiguration(job);
+    Object data = getFirstRow(job, parserType, ((StaticPathSpec) config.getPathSpec()).getPaths());
+
+    List<InputRow> rows = (List<InputRow>) config.getParser().parseBatch(data);
+    assertEquals(TS1, rows.get(0).getTimestamp().toString());
+    assertEquals("d1v1", rows.get(0).getDimension("dim1").get(0));
+    assertEquals("d2v1", rows.get(0).getDimension("dim2").get(0));
+    assertEquals("1", rows.get(0).getDimension("dim3").get(0));
+    assertEquals("listDim1v1", rows.get(0).getDimension("listDim").get(0));
+    assertEquals("listDim1v2", rows.get(0).getDimension("listDim").get(1));
+    assertEquals(1, rows.get(0).getMetric("metric1").longValue());
+  }
+
+  @Test
+  public void testFlat1Autodiscover() throws IOException, InterruptedException
+  {
+    HadoopDruidIndexerConfig config = transformHadoopDruidIndexerConfig(
+        "example/flattening/flat_1_autodiscover_fields.json",
+        parserType,
+        true
+    );
+    config.intoConfiguration(job);
+    Object data = getFirstRow(job, parserType, ((StaticPathSpec) config.getPathSpec()).getPaths());
+
+    List<InputRow> rows = (List<InputRow>) config.getParser().parseBatch(data);
+    assertEquals(TS1, rows.get(0).getTimestamp().toString());
+    assertEquals("d1v1", rows.get(0).getDimension("dim1").get(0));
+    assertEquals("d2v1", rows.get(0).getDimension("dim2").get(0));
+    assertEquals("1", rows.get(0).getDimension("dim3").get(0));
+    assertEquals("listDim1v1", rows.get(0).getDimension("listDim").get(0));
+    assertEquals("listDim1v2", rows.get(0).getDimension("listDim").get(1));
+    assertEquals(1, rows.get(0).getMetric("metric1").longValue());
+  }
+
+  @Test
+  public void testFlat1Flatten() throws IOException, InterruptedException
+  {
+    HadoopDruidIndexerConfig config = transformHadoopDruidIndexerConfig(
+        "example/flattening/flat_1_flatten.json",
+        parserType,
+        true
+    );
+    config.intoConfiguration(job);
+    Object data = getFirstRow(job, parserType, ((StaticPathSpec) config.getPathSpec()).getPaths());
+
+    List<InputRow> rows = (List<InputRow>) config.getParser().parseBatch(data);
+    assertEquals(TS1, rows.get(0).getTimestamp().toString());
+    assertEquals("d1v1", rows.get(0).getDimension("dim1").get(0));
+    assertEquals("d2v1", rows.get(0).getDimension("dim2").get(0));
+    assertEquals("1", rows.get(0).getDimension("dim3").get(0));
+    assertEquals("listDim1v1", rows.get(0).getDimension("list").get(0));
+    assertEquals("listDim1v2", rows.get(0).getDimension("list").get(1));
+    assertEquals(1, rows.get(0).getMetric("metric1").longValue());
+  }
+
+  @Test
+  public void testFlat1FlattenSelectListItem() throws IOException, InterruptedException
+  {
+    HadoopDruidIndexerConfig config = transformHadoopDruidIndexerConfig(
+        "example/flattening/flat_1_list_index.json",
+        parserType,
+        true
+    );
+    config.intoConfiguration(job);
+    Object data = getFirstRow(job, parserType, ((StaticPathSpec) config.getPathSpec()).getPaths());
+
+    List<InputRow> rows = (List<InputRow>) config.getParser().parseBatch(data);
+    assertEquals(TS1, rows.get(0).getTimestamp().toString());
+    assertEquals("d1v1", rows.get(0).getDimension("dim1").get(0));
+    assertEquals("d2v1", rows.get(0).getDimension("dim2").get(0));
+    assertEquals("listDim1v2", rows.get(0).getDimension("listextracted").get(0));
+    assertEquals(1, rows.get(0).getMetric("metric1").longValue());
+  }
+
+
+  @Test
+  public void testNested1NoFlattenSpec() throws IOException, InterruptedException
+  {
+    HadoopDruidIndexerConfig config = transformHadoopDruidIndexerConfig(
+        "example/flattening/nested_1.json",
+        parserType,
+        false
+    );
+    config.intoConfiguration(job);
+    Object data = getFirstRow(job, parserType, ((StaticPathSpec) config.getPathSpec()).getPaths());
+
+    List<InputRow> rows = (List<InputRow>) config.getParser().parseBatch(data);
+    assertEquals(TS1, rows.get(0).getTimestamp().toString());
+    assertEquals("d1v1", rows.get(0).getDimension("dim1").get(0));
+    List<String> dims = rows.get(0).getDimensions();
+    Assert.assertFalse(dims.contains("dim2"));
+    Assert.assertFalse(dims.contains("dim3"));
+    Assert.assertFalse(dims.contains("listDim"));
+    Assert.assertFalse(dims.contains("nestedData"));
+    assertEquals(1, rows.get(0).getMetric("metric1").longValue());
+  }
+
+  @Test
+  public void testNested1Autodiscover() throws IOException, InterruptedException
+  {
+    HadoopDruidIndexerConfig config = transformHadoopDruidIndexerConfig(
+        "example/flattening/nested_1_autodiscover_fields.json",
+        parserType,
+        true
+    );
+    config.intoConfiguration(job);
+    Object data = getFirstRow(job, parserType, ((StaticPathSpec) config.getPathSpec()).getPaths());
+
+    List<InputRow> rows = (List<InputRow>) config.getParser().parseBatch(data);
+    assertEquals(TS1, rows.get(0).getTimestamp().toString());
+    assertEquals("d1v1", rows.get(0).getDimension("dim1").get(0));
+    List<String> dims = rows.get(0).getDimensions();
+    Assert.assertFalse(dims.contains("dim2"));
+    Assert.assertFalse(dims.contains("dim3"));
+    Assert.assertFalse(dims.contains("listDim"));
+    assertEquals(1, rows.get(0).getMetric("metric1").longValue());
+  }
+
+  @Test
+  public void testNested1Flatten() throws IOException, InterruptedException
+  {
+    HadoopDruidIndexerConfig config = transformHadoopDruidIndexerConfig(
+        "example/flattening/nested_1_flatten.json",
+        parserType,
+        true
+    );
+    config.intoConfiguration(job);
+    Object data = getFirstRow(job, parserType, ((StaticPathSpec) config.getPathSpec()).getPaths());
+
+    List<InputRow> rows = (List<InputRow>) config.getParser().parseBatch(data);
+    assertEquals(TS1, rows.get(0).getTimestamp().toString());
+    assertEquals("d1v1", rows.get(0).getDimension("dim1").get(0));
+    assertEquals("d2v1", rows.get(0).getDimension("dim2").get(0));
+    assertEquals("1", rows.get(0).getDimension("dim3").get(0));
+    assertEquals("listDim1v1", rows.get(0).getDimension("listDim").get(0));
+    assertEquals("listDim1v2", rows.get(0).getDimension("listDim").get(1));
+    assertEquals(1, rows.get(0).getMetric("metric1").longValue());
+    assertEquals(2, rows.get(0).getMetric("metric2").longValue());
+  }
+
+  @Test
+  public void testNested1FlattenSelectListItem() throws IOException, InterruptedException
+  {
+    HadoopDruidIndexerConfig config = transformHadoopDruidIndexerConfig(
+        "example/flattening/nested_1_list_index.json",
+        parserType,
+        true
+    );
+    config.intoConfiguration(job);
+    Object data = getFirstRow(job, parserType, ((StaticPathSpec) config.getPathSpec()).getPaths());
+
+    List<InputRow> rows = (List<InputRow>) config.getParser().parseBatch(data);
+    assertEquals(TS1, rows.get(0).getTimestamp().toString());
+    assertEquals("d1v1", rows.get(0).getDimension("dim1").get(0));
+    assertEquals("d2v1", rows.get(0).getDimension("dim2").get(0));
+    assertEquals("1", rows.get(0).getDimension("dim3").get(0));
+    assertEquals("listDim1v2", rows.get(0).getDimension("listextracted").get(0));
+    assertEquals(1, rows.get(0).getMetric("metric1").longValue());
+  }
+
+}
diff --git a/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/TimestampsParquetInputTest.java b/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/TimestampsParquetInputTest.java
new file mode 100644
index 00000000000..e06b3817ee9
--- /dev/null
+++ b/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/TimestampsParquetInputTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.parquet;
+
+import avro.shaded.com.google.common.collect.ImmutableList;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.indexer.HadoopDruidIndexerConfig;
+import org.apache.druid.indexer.path.StaticPathSpec;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+@RunWith(Parameterized.class)
+public class TimestampsParquetInputTest extends BaseParquetInputTest
+{
+  @Parameterized.Parameters(name = "type = {0}")
+  public static Iterable<Object[]> constructorFeeder()
+  {
+    return ImmutableList.of(
+        new Object[]{ParquetExtensionsModule.PARQUET_AVRO_INPUT_PARSER_TYPE},
+        new Object[]{ParquetExtensionsModule.PARQUET_SIMPLE_INPUT_PARSER_TYPE}
+    );
+  }
+
+  private final String parserType;
+  private final Job job;
+
+  public TimestampsParquetInputTest(String parserType) throws IOException
+  {
+    this.parserType = parserType;
+    this.job = Job.getInstance(new Configuration());
+  }
+
+  @Test
+  public void testDateHandling() throws IOException, InterruptedException
+  {
+    HadoopDruidIndexerConfig configTimeAsString = transformHadoopDruidIndexerConfig(
+        "example/timestamps/date_test_data_job_string.json",
+        parserType,
+        false
+    );
+    HadoopDruidIndexerConfig configTimeAsDate = transformHadoopDruidIndexerConfig(
+        "example/timestamps/date_test_data_job_date.json",
+        parserType,
+        false
+    );
+    List<InputRow> rowsWithString = getAllRows(parserType, configTimeAsString);
+    List<InputRow> rowsWithDate = getAllRows(parserType, configTimeAsDate);
+    assertEquals(rowsWithDate.size(), rowsWithString.size());
+
+    for (int i = 0; i < rowsWithDate.size(); i++) {
+      assertEquals(rowsWithString.get(i).getTimestamp(), rowsWithDate.get(i).getTimestamp());
+    }
+  }
+
+  @Test
+  public void testParseInt96Timestamp() throws IOException, InterruptedException
+  {
+    // parquet-avro does not support int96, but if it ever does, remove this
+    if (parserType.equals(ParquetExtensionsModule.PARQUET_AVRO_INPUT_PARSER_TYPE)) {
+      return;
+    }
+
+    // the source parquet file was found in apache spark sql repo tests, where it is known as impala_timestamp.parq
+    // it has a single column, "ts" which is an int96 timestamp
+    HadoopDruidIndexerConfig config = transformHadoopDruidIndexerConfig(
+        "example/timestamps/int96_timestamp.json",
+        parserType,
+        true
+    );
+    config.intoConfiguration(job);
+    Object data = getFirstRow(job, parserType, ((StaticPathSpec) config.getPathSpec()).getPaths());
+
+    List<InputRow> rows = (List<InputRow>) config.getParser().parseBatch(data);
+    assertEquals("2001-01-01T01:01:01.000Z", rows.get(0).getTimestamp().toString());
+  }
+
+  @Test
+  public void testTimeMillisInInt64() throws IOException, InterruptedException
+  {
+    HadoopDruidIndexerConfig config = transformHadoopDruidIndexerConfig(
+        "example/timestamps/timemillis_in_i64.json",
+        parserType,
+        true
+    );
+    config.intoConfiguration(job);
+    List<InputRow> rows = getAllRows(parserType, config);
+    assertEquals("1970-01-01T00:00:00.010Z", rows.get(0).getTimestamp().toString());
+  }
+}
diff --git a/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/WikiParquetInputTest.java b/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/WikiParquetInputTest.java
new file mode 100644
index 00000000000..cc1245434f6
--- /dev/null
+++ b/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/WikiParquetInputTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.parquet;
+
+import avro.shaded.com.google.common.collect.ImmutableList;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.indexer.HadoopDruidIndexerConfig;
+import org.apache.druid.indexer.path.StaticPathSpec;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+@RunWith(Parameterized.class)
+public class WikiParquetInputTest extends BaseParquetInputTest
+{
+  @Parameterized.Parameters(name = "type = {0}")
+  public static Iterable<Object[]> constructorFeeder()
+  {
+    return ImmutableList.of(
+        new Object[]{ParquetExtensionsModule.PARQUET_AVRO_INPUT_PARSER_TYPE},
+        new Object[]{ParquetExtensionsModule.PARQUET_SIMPLE_INPUT_PARSER_TYPE}
+    );
+  }
+
+  private final String parserType;
+  private final Job job;
+
+  public WikiParquetInputTest(String parserType) throws IOException
+  {
+    this.parserType = parserType;
+    this.job = Job.getInstance(new Configuration());
+  }
+
+  @Test
+  public void testWiki() throws IOException, InterruptedException
+  {
+    HadoopDruidIndexerConfig config = transformHadoopDruidIndexerConfig(
+        "example/wiki/wiki.json",
+        parserType,
+        false
+    );
+    config.intoConfiguration(job);
+
+    Object data = getFirstRow(job, parserType, ((StaticPathSpec) config.getPathSpec()).getPaths());
+    List<InputRow> rows = (List<InputRow>) config.getParser().parseBatch(data);
+    assertEquals(rows.get(0).getDimension("page").get(0), "Gypsy Danger");
+    String s1 = rows.get(0).getDimension("language").get(0);
+    String s2 = rows.get(0).getDimension("language").get(1);
+    assertEquals("en", s1);
+    assertEquals("zh", s2);
+  }
+}
diff --git a/pom.xml b/pom.xml
index a2f5ecf557f..554eb064ad4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -134,6 +134,7 @@
         <module>extensions-core/kafka-extraction-namespace</module>
         <module>extensions-core/kafka-indexing-service</module>
         <module>extensions-core/mysql-metadata-storage</module>
+        <module>extensions-core/parquet-extensions</module>
         <module>extensions-core/postgresql-metadata-storage</module>
         <module>extensions-core/protobuf-extensions</module>
         <module>extensions-core/lookups-cached-global</module>
@@ -151,7 +152,6 @@
         <module>extensions-contrib/kafka-eight-simpleConsumer</module>
         <module>extensions-contrib/rabbitmq</module>
         <module>extensions-contrib/distinctcount</module>
-        <module>extensions-contrib/parquet-extensions</module>
         <module>extensions-contrib/statsd-emitter</module>
         <module>extensions-contrib/orc-extensions</module>
         <module>extensions-contrib/time-min-max</module>
@@ -1441,6 +1441,8 @@
                                 <exclude>.travis.yml</exclude>
                                 <!--DEV and IT-TESTS-->
                                 <exclude>**/*.json</exclude>
+                                <exclude>**/*.parq</exclude>
+                                <exclude>**/*.parquet</exclude>
                                 <exclude>**/jvm.config</exclude>
                                 <exclude>**/quickstart/protobuf/**</exclude>
                                 <exclude>**/tutorial/conf/**</exclude>


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org