You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by pr...@apache.org on 2017/09/30 23:01:41 UTC

[2/2] drill git commit: DRILL-4139: Add missing Interval, VarBinary and Varchar with nulls partition pruning support. Fix metadata serialization for fixed_len_byte_array types. Fix partition pruning for decimal type. Fix loss of scale value for DECIMAL i

DRILL-4139: Add missing Interval, VarBinary and Varchar with nulls partition pruning support.
Fix metadata serialization for fixed_len_byte_array types.
Fix partition pruning for decimal type.
Fix loss of scale value for DECIMAL in parquet partition pruning.
Fix partition pruning for primitive types with null values.
Update parquet table metadata version to v3_3.
DRILL-4139: Fix wrong parquet metadata cache version after resolving conflicts with DRILL-4264

closes #805


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/5df49ab9
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/5df49ab9
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/5df49ab9

Branch: refs/heads/master
Commit: 5df49ab9c250114d93aa90507b029fad77f4e6bd
Parents: 1ea191f
Author: Volodymyr Vysotskyi <vv...@gmail.com>
Authored: Fri Mar 24 16:02:07 2017 +0000
Committer: Paul Rogers <pr...@maprtech.com>
Committed: Sat Sep 30 16:01:22 2017 -0700

----------------------------------------------------------------------
 .../drill/exec/store/parquet/Metadata.java      | 101 +++++++--
 .../exec/store/parquet/MetadataVersion.java     |   9 +-
 .../exec/store/parquet/ParquetGroupScan.java    | 222 +++++++++++++++----
 .../store/parquet/ParquetReaderUtility.java     | 112 +++++++++-
 .../parquet/stat/ParquetMetaStatCollector.java  |  36 ++-
 .../store/parquet/TestParquetMetadataCache.java | 217 +++++++++++++++++-
 6 files changed, 634 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/5df49ab9/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java
index 5ac10e6..eadbeb0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java
@@ -24,6 +24,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Iterator;
 
+import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 
@@ -86,6 +87,7 @@ import static org.apache.drill.exec.store.parquet.MetadataVersion.Constants.V2;
 import static org.apache.drill.exec.store.parquet.MetadataVersion.Constants.V3;
 import static org.apache.drill.exec.store.parquet.MetadataVersion.Constants.V3_1;
 import static org.apache.drill.exec.store.parquet.MetadataVersion.Constants.V3_2;
+import static org.apache.drill.exec.store.parquet.MetadataVersion.Constants.V3_3;
 
 public class Metadata {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Metadata.class);
@@ -700,7 +702,8 @@ public class Metadata {
       @JsonSubTypes.Type(value = ParquetTableMetadata_v2.class, name = V2),
       @JsonSubTypes.Type(value = ParquetTableMetadata_v3.class, name = V3),
       @JsonSubTypes.Type(value = ParquetTableMetadata_v3.class, name = V3_1),
-      @JsonSubTypes.Type(value = ParquetTableMetadata_v3.class, name = V3_2)
+      @JsonSubTypes.Type(value = ParquetTableMetadata_v3.class, name = V3_2),
+      @JsonSubTypes.Type(value = ParquetTableMetadata_v3.class, name = V3_3)
       })
   public static abstract class ParquetTableMetadataBase {
 
@@ -756,7 +759,7 @@ public class Metadata {
 
     public abstract Long getNulls();
 
-    public abstract boolean hasSingleValue();
+    public abstract boolean hasSingleValue(long rowCount);
 
     public abstract Object getMinValue();
 
@@ -1054,8 +1057,36 @@ public class Metadata {
       return nulls;
     }
 
-    @Override public boolean hasSingleValue() {
-      return (max != null && min != null && max.equals(min));
+    /**
+     * Checks that the column chunk has a single value.
+     * Returns {@code true} if {@code min} and {@code max} are the same but not null
+     * and nulls count is 0 or equal to the rows count.
+     * <p>
+     * Returns {@code true} if {@code min} and {@code max} are null and the number of null values
+     * in the column chunk is equal to the rows count.
+     * <p>
+     * Comparison of nulls and rows count is needed for the cases:
+     * <ul>
+     * <li>column with primitive type has single value and null values</li>
+     *
+     * <li>column <b>with primitive type</b> has only null values, min/max couldn't be null,
+     * but column has single value</li>
+     * </ul>
+     *
+     * @param rowCount rows count in column chunk
+     * @return true if column has single value
+     */
+    @Override
+    public boolean hasSingleValue(long rowCount) {
+      if (nulls != null) {
+        if (min != null) {
+          // Objects.deepEquals() is used here, since min and max may be byte arrays
+          return Objects.deepEquals(min, max) && (nulls == 0 || nulls == rowCount);
+        } else {
+          return nulls == rowCount && max == null;
+        }
+      }
+      return false;
     }
 
     @Override public Object getMinValue() {
@@ -1363,9 +1394,24 @@ public class Metadata {
       return nulls;
     }
 
+    /**
+     * Checks that the column chunk has a single value.
+     * Returns {@code true} if {@code mxValue} is not null
+     * and nulls count is 0 or if nulls count is equal to the rows count.
+     * <p>
+     * Comparison of nulls and rows count is needed for the cases:
+     * <ul>
+     * <li>column with primitive type has single value and null values</li>
+     *
+     * <li>column <b>with binary type</b> has only null values, so column has single value</li>
+     * </ul>
+     *
+     * @param rowCount rows count in column chunk
+     * @return true if column has single value
+     */
     @Override
-    public boolean hasSingleValue() {
-      return (mxValue != null);
+    public boolean hasSingleValue(long rowCount) {
+      return (mxValue != null && nulls == 0) || nulls == rowCount;
     }
 
     @Override public Object getMinValue() {
@@ -1426,7 +1472,7 @@ public class Metadata {
 
   }
 
-  @JsonTypeName(V3_2)
+  @JsonTypeName(V3_3)
   public static class ParquetTableMetadata_v3 extends ParquetTableMetadataBase {
     @JsonProperty(value = "metadata_version", access = JsonProperty.Access.WRITE_ONLY) private String metadataVersion;
     /*
@@ -1753,9 +1799,36 @@ public class Metadata {
       return nulls;
     }
 
+    /**
+     * Checks that the column chunk has a single value.
+     * Returns {@code true} if {@code minValue} and {@code maxValue} are the same but not null
+     * and nulls count is 0 or equal to the rows count.
+     * <p>
+     * Returns {@code true} if {@code minValue} and {@code maxValue} are null and the number of null values
+     * in the column chunk is equal to the rows count.
+     * <p>
+     * Comparison of nulls and rows count is needed for the cases:
+     * <ul>
+     * <li>column with primitive type has single value and null values</li>
+     *
+     * <li>column <b>with primitive type</b> has only null values, min/max couldn't be null,
+     * but column has single value</li>
+     * </ul>
+     *
+     * @param rowCount rows count in column chunk
+     * @return true if column has single value
+     */
     @Override
-    public boolean hasSingleValue() {
-      return (minValue !=null && maxValue != null && minValue.equals(maxValue));
+    public boolean hasSingleValue(long rowCount) {
+      if (nulls != null) {
+        if (minValue != null) {
+          // Objects.deepEquals() is used here, since min and max may be byte arrays
+          return Objects.deepEquals(minValue, maxValue) && (nulls == 0 || nulls == rowCount);
+        } else {
+          return nulls == rowCount && maxValue == null;
+        }
+      }
+      return false;
     }
 
     @Override public Object getMinValue() {
@@ -1795,8 +1868,9 @@ public class Metadata {
         jgen.writeEndArray();
         if (value.minValue != null) {
           Object val;
-          if (value.primitiveType == PrimitiveTypeName.BINARY && value.minValue != null) {
-            val = new String(((Binary) value.minValue).getBytes());
+          if (value.primitiveType == PrimitiveTypeName.BINARY
+              || value.primitiveType == PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) {
+            val = ((Binary) value.minValue).getBytes();
           } else {
             val = value.minValue;
           }
@@ -1804,8 +1878,9 @@ public class Metadata {
         }
         if (value.maxValue != null) {
           Object val;
-          if (value.primitiveType == PrimitiveTypeName.BINARY && value.maxValue != null) {
-            val = new String(((Binary) value.maxValue).getBytes());
+          if (value.primitiveType == PrimitiveTypeName.BINARY
+              || value.primitiveType == PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) {
+            val = ((Binary) value.maxValue).getBytes();
           } else {
             val = value.maxValue;
           }

http://git-wip-us.apache.org/repos/asf/drill/blob/5df49ab9/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/MetadataVersion.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/MetadataVersion.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/MetadataVersion.java
index 5ceadcd..9fd64ac 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/MetadataVersion.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/MetadataVersion.java
@@ -141,6 +141,12 @@ public class MetadataVersion implements Comparable<MetadataVersion> {
     public static final String V3_2 = "3.2";
 
     /**
+     * Version 3.3: Changed serialization of BINARY and FIXED_LEN_BYTE_ARRAY fields.<br>
+     * See DRILL-4139
+     */
+    public static final String V3_3 = "3.3";
+
+    /**
      * All historical versions of the Drill metadata cache files. In case of introducing a new parquet metadata version
      * please follow the {@link MetadataVersion#FORMAT}.
      */
@@ -149,7 +155,8 @@ public class MetadataVersion implements Comparable<MetadataVersion> {
         new MetadataVersion(V2),
         new MetadataVersion(V3),
         new MetadataVersion(V3_1),
-        new MetadataVersion(V3_2)
+        new MetadataVersion(V3_2),
+        new MetadataVersion(V3_3)
     );
 
     /**

http://git-wip-us.apache.org/repos/asf/drill/blob/5df49ab9/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
index fd38127..4e38ce9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
@@ -36,6 +36,7 @@ import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.expression.ValueExpressions;
 import org.apache.drill.common.logical.FormatPluginConfig;
 import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.types.TypeProtos.MajorType;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.common.types.Types;
@@ -75,6 +76,7 @@ import org.apache.drill.exec.store.schedule.AssignmentCreator;
 import org.apache.drill.exec.store.schedule.CompleteWork;
 import org.apache.drill.exec.store.schedule.EndpointByteMap;
 import org.apache.drill.exec.store.schedule.EndpointByteMapImpl;
+import org.apache.drill.exec.util.DecimalUtility;
 import org.apache.drill.exec.util.ImpersonationUtil;
 import org.apache.drill.exec.vector.NullableBitVector;
 import org.apache.drill.exec.vector.NullableBigIntVector;
@@ -83,6 +85,7 @@ import org.apache.drill.exec.vector.NullableDecimal18Vector;
 import org.apache.drill.exec.vector.NullableFloat4Vector;
 import org.apache.drill.exec.vector.NullableFloat8Vector;
 import org.apache.drill.exec.vector.NullableIntVector;
+import org.apache.drill.exec.vector.NullableIntervalVector;
 import org.apache.drill.exec.vector.NullableSmallIntVector;
 import org.apache.drill.exec.vector.NullableTimeStampVector;
 import org.apache.drill.exec.vector.NullableTimeVector;
@@ -360,11 +363,20 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
    * potential partition column now no longer qualifies, so it needs to be removed from the list.
    * @return whether column is a potential partition column
    */
-  private boolean checkForPartitionColumn(ColumnMetadata columnMetadata, boolean first) {
+  private boolean checkForPartitionColumn(ColumnMetadata columnMetadata, boolean first, long rowCount) {
     SchemaPath schemaPath = SchemaPath.getCompoundPath(columnMetadata.getName());
     final PrimitiveTypeName primitiveType;
     final OriginalType originalType;
+    int precision = 0;
+    int scale = 0;
     if (this.parquetTableMetadata.hasColumnMetadata()) {
+      // only ColumnTypeMetadata_v3 stores information about scale and precision
+      if (parquetTableMetadata instanceof Metadata.ParquetTableMetadata_v3) {
+        Metadata.ColumnTypeMetadata_v3 columnTypeInfo = ((Metadata.ParquetTableMetadata_v3) parquetTableMetadata)
+                                                                          .getColumnTypeInfo(columnMetadata.getName());
+        scale = columnTypeInfo.scale;
+        precision = columnTypeInfo.precision;
+      }
       primitiveType = this.parquetTableMetadata.getPrimitiveType(columnMetadata.getName());
       originalType = this.parquetTableMetadata.getOriginalType(columnMetadata.getName());
     } else {
@@ -372,8 +384,8 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
       originalType = columnMetadata.getOriginalType();
     }
     if (first) {
-      if (hasSingleValue(columnMetadata)) {
-        partitionColTypeMap.put(schemaPath, getType(primitiveType, originalType));
+      if (hasSingleValue(columnMetadata, rowCount)) {
+        partitionColTypeMap.put(schemaPath, getType(primitiveType, originalType, scale, precision));
         return true;
       } else {
         return false;
@@ -382,11 +394,11 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
       if (!partitionColTypeMap.keySet().contains(schemaPath)) {
         return false;
       } else {
-        if (!hasSingleValue(columnMetadata)) {
+        if (!hasSingleValue(columnMetadata, rowCount)) {
           partitionColTypeMap.remove(schemaPath);
           return false;
         }
-        if (!getType(primitiveType, originalType).equals(partitionColTypeMap.get(schemaPath))) {
+        if (!getType(primitiveType, originalType, scale, precision).equals(partitionColTypeMap.get(schemaPath))) {
           partitionColTypeMap.remove(schemaPath);
           return false;
         }
@@ -395,11 +407,21 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
     return true;
   }
 
-  public static MajorType getType(PrimitiveTypeName type, OriginalType originalType) {
+  /**
+   * Builds major type using given {@code OriginalType originalType} or {@code PrimitiveTypeName type}.
+   * For DECIMAL will be returned major type with scale and precision.
+   *
+   * @param type         parquet primitive type
+   * @param originalType parquet original type
+   * @param scale        type scale (used for DECIMAL type)
+   * @param precision    type precision (used for DECIMAL type)
+   * @return major type
+   */
+  public static MajorType getType(PrimitiveTypeName type, OriginalType originalType, int scale, int precision) {
     if (originalType != null) {
       switch (originalType) {
         case DECIMAL:
-          return Types.optional(MinorType.DECIMAL18);
+          return Types.withScaleAndPrecision(MinorType.DECIMAL18, TypeProtos.DataMode.OPTIONAL, scale, precision);
         case DATE:
           return Types.optional(MinorType.DATE);
         case TIME_MILLIS:
@@ -420,6 +442,8 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
           return Types.optional(MinorType.TINYINT);
         case INT_16:
           return Types.optional(MinorType.SMALLINT);
+        case INTERVAL:
+          return Types.optional(MinorType.INTERVAL);
       }
     }
 
@@ -444,10 +468,17 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
     }
   }
 
-  private boolean hasSingleValue(ColumnMetadata columnChunkMetaData) {
+  /**
+   * Checks that the column chunk has a single value.
+   *
+   * @param columnChunkMetaData metadata to check
+   * @param rowCount            rows count in column chunk
+   * @return true if column has single value
+   */
+  private boolean hasSingleValue(ColumnMetadata columnChunkMetaData, long rowCount) {
     // ColumnMetadata will have a non-null value iff the minValue and the maxValue for the
     // rowgroup are the same
-    return (columnChunkMetaData != null) && (columnChunkMetaData.hasSingleValue());
+    return (columnChunkMetaData != null) && (columnChunkMetaData.hasSingleValue(rowCount));
   }
 
   @Override public void modifyFileSelection(FileSelection selection) {
@@ -476,129 +507,233 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
 
   public void populatePruningVector(ValueVector v, int index, SchemaPath column, String file) {
     String f = Path.getPathWithoutSchemeAndAuthority(new Path(file)).toString();
-    MinorType type = getTypeForColumn(column).getMinorType();
+    MajorType majorType = getTypeForColumn(column);
+    MinorType type = majorType.getMinorType();
     switch (type) {
       case BIT: {
         NullableBitVector bitVector = (NullableBitVector) v;
         Boolean value = (Boolean) partitionValueMap.get(f).get(column);
-        bitVector.getMutator().setSafe(index, value ? 1 : 0);
+        if (value == null) {
+          bitVector.getMutator().setNull(index);
+        } else {
+          bitVector.getMutator().setSafe(index, value ? 1 : 0);
+        }
         return;
       }
       case INT: {
         NullableIntVector intVector = (NullableIntVector) v;
         Integer value = (Integer) partitionValueMap.get(f).get(column);
-        intVector.getMutator().setSafe(index, value);
+        if (value == null) {
+          intVector.getMutator().setNull(index);
+        } else {
+          intVector.getMutator().setSafe(index, value);
+        }
         return;
       }
       case SMALLINT: {
         NullableSmallIntVector smallIntVector = (NullableSmallIntVector) v;
         Integer value = (Integer) partitionValueMap.get(f).get(column);
-        smallIntVector.getMutator().setSafe(index, value.shortValue());
+        if (value == null) {
+          smallIntVector.getMutator().setNull(index);
+        } else {
+          smallIntVector.getMutator().setSafe(index, value.shortValue());
+        }
         return;
       }
       case TINYINT: {
         NullableTinyIntVector tinyIntVector = (NullableTinyIntVector) v;
         Integer value = (Integer) partitionValueMap.get(f).get(column);
-        tinyIntVector.getMutator().setSafe(index, value.byteValue());
+        if (value == null) {
+          tinyIntVector.getMutator().setNull(index);
+        } else {
+          tinyIntVector.getMutator().setSafe(index, value.byteValue());
+        }
         return;
       }
       case UINT1: {
         NullableUInt1Vector intVector = (NullableUInt1Vector) v;
         Integer value = (Integer) partitionValueMap.get(f).get(column);
-        intVector.getMutator().setSafe(index, value.byteValue());
+        if (value == null) {
+          intVector.getMutator().setNull(index);
+        } else {
+          intVector.getMutator().setSafe(index, value.byteValue());
+        }
         return;
       }
       case UINT2: {
         NullableUInt2Vector intVector = (NullableUInt2Vector) v;
         Integer value = (Integer) partitionValueMap.get(f).get(column);
-        intVector.getMutator().setSafe(index, (char) value.shortValue());
+        if (value == null) {
+          intVector.getMutator().setNull(index);
+        } else {
+          intVector.getMutator().setSafe(index, (char) value.shortValue());
+        }
         return;
       }
       case UINT4: {
         NullableUInt4Vector intVector = (NullableUInt4Vector) v;
         Integer value = (Integer) partitionValueMap.get(f).get(column);
-        intVector.getMutator().setSafe(index, value);
+        if (value == null) {
+          intVector.getMutator().setNull(index);
+        } else {
+          intVector.getMutator().setSafe(index, value);
+        }
         return;
       }
       case BIGINT: {
         NullableBigIntVector bigIntVector = (NullableBigIntVector) v;
         Long value = (Long) partitionValueMap.get(f).get(column);
-        bigIntVector.getMutator().setSafe(index, value);
+        if (value == null) {
+          bigIntVector.getMutator().setNull(index);
+        } else {
+          bigIntVector.getMutator().setSafe(index, value);
+        }
         return;
       }
       case FLOAT4: {
         NullableFloat4Vector float4Vector = (NullableFloat4Vector) v;
         Float value = (Float) partitionValueMap.get(f).get(column);
-        float4Vector.getMutator().setSafe(index, value);
+        if (value == null) {
+          float4Vector.getMutator().setNull(index);
+        } else {
+          float4Vector.getMutator().setSafe(index, value);
+        }
         return;
       }
       case FLOAT8: {
         NullableFloat8Vector float8Vector = (NullableFloat8Vector) v;
         Double value = (Double) partitionValueMap.get(f).get(column);
-        float8Vector.getMutator().setSafe(index, value);
+        if (value == null) {
+          float8Vector.getMutator().setNull(index);
+        } else {
+          float8Vector.getMutator().setSafe(index, value);
+        }
         return;
       }
       case VARBINARY: {
         NullableVarBinaryVector varBinaryVector = (NullableVarBinaryVector) v;
         Object s = partitionValueMap.get(f).get(column);
         byte[] bytes;
-        if (s instanceof Binary) {
-          bytes = ((Binary) s).getBytes();
-        } else if (s instanceof String) {
-          bytes = ((String) s).getBytes();
-        } else if (s instanceof byte[]) {
-          bytes = (byte[]) s;
+        if (s == null) {
+          varBinaryVector.getMutator().setNull(index);
+          return;
         } else {
-          throw new UnsupportedOperationException("Unable to create column data for type: " + type);
+          bytes = getBytes(type, s);
         }
         varBinaryVector.getMutator().setSafe(index, bytes, 0, bytes.length);
         return;
       }
       case DECIMAL18: {
         NullableDecimal18Vector decimalVector = (NullableDecimal18Vector) v;
-        Long value = (Long) partitionValueMap.get(f).get(column);
+        Object s = partitionValueMap.get(f).get(column);
+        byte[] bytes;
+        if (s == null) {
+          decimalVector.getMutator().setNull(index);
+          return;
+        } else if (s instanceof Integer) {
+          long value = DecimalUtility.getBigDecimalFromPrimitiveTypes(
+                          (Integer) s,
+                          majorType.getScale(),
+                          majorType.getPrecision()).longValue();
+          decimalVector.getMutator().setSafe(index, value);
+          return;
+        } else if (s instanceof Long) {
+          long value = DecimalUtility.getBigDecimalFromPrimitiveTypes(
+                          (Long) s,
+                          majorType.getScale(),
+                          majorType.getPrecision()).longValue();
+          decimalVector.getMutator().setSafe(index, value);
+          return;
+        } else {
+          bytes = getBytes(type, s);
+        }
+        long value = DecimalUtility.getBigDecimalFromByteArray(bytes, 0, bytes.length, majorType.getScale()).longValue();
         decimalVector.getMutator().setSafe(index, value);
         return;
       }
       case DATE: {
         NullableDateVector dateVector = (NullableDateVector) v;
         Integer value = (Integer) partitionValueMap.get(f).get(column);
-        dateVector.getMutator().setSafe(index, value * (long) DateTimeConstants.MILLIS_PER_DAY);
+        if (value == null) {
+          dateVector.getMutator().setNull(index);
+        } else {
+          dateVector.getMutator().setSafe(index, value * (long) DateTimeConstants.MILLIS_PER_DAY);
+        }
         return;
       }
       case TIME: {
         NullableTimeVector timeVector = (NullableTimeVector) v;
         Integer value = (Integer) partitionValueMap.get(f).get(column);
-        timeVector.getMutator().setSafe(index, value);
+        if (value == null) {
+          timeVector.getMutator().setNull(index);
+        } else {
+          timeVector.getMutator().setSafe(index, value);
+        }
         return;
       }
       case TIMESTAMP: {
         NullableTimeStampVector timeStampVector = (NullableTimeStampVector) v;
         Long value = (Long) partitionValueMap.get(f).get(column);
-        timeStampVector.getMutator().setSafe(index, value);
+        if (value == null) {
+          timeStampVector.getMutator().setNull(index);
+        } else {
+          timeStampVector.getMutator().setSafe(index, value);
+        }
         return;
       }
       case VARCHAR: {
         NullableVarCharVector varCharVector = (NullableVarCharVector) v;
         Object s = partitionValueMap.get(f).get(column);
         byte[] bytes;
-        if (s instanceof String) { // if the metadata was read from a JSON cache file it maybe a string type
-          bytes = ((String) s).getBytes();
-        } else if (s instanceof Binary) {
-          bytes = ((Binary) s).getBytes();
-        } else if (s instanceof byte[]) {
-          bytes = (byte[]) s;
+        if (s == null) {
+          varCharVector.getMutator().setNull(index);
+          return;
         } else {
-          throw new UnsupportedOperationException("Unable to create column data for type: " + type);
+          bytes = getBytes(type, s);
         }
         varCharVector.getMutator().setSafe(index, bytes, 0, bytes.length);
         return;
       }
+      case INTERVAL: {
+        NullableIntervalVector intervalVector = (NullableIntervalVector) v;
+        Object s = partitionValueMap.get(f).get(column);
+        byte[] bytes;
+        if (s == null) {
+          intervalVector.getMutator().setNull(index);
+          return;
+        } else {
+          bytes = getBytes(type, s);
+        }
+        intervalVector.getMutator().setSafe(index, 1,
+          ParquetReaderUtility.getIntFromLEBytes(bytes, 0),
+          ParquetReaderUtility.getIntFromLEBytes(bytes, 4),
+          ParquetReaderUtility.getIntFromLEBytes(bytes, 8));
+        return;
+      }
       default:
         throw new UnsupportedOperationException("Unsupported type: " + type);
     }
   }
 
+  /**
+   * Returns the sequence of bytes received from {@code Object source}.
+   *
+   * @param type   the column type
+   * @param source the source of the bytes sequence
+   * @return bytes sequence obtained from {@code Object source}
+   */
+  private byte[] getBytes(MinorType type, Object source) {
+    byte[] bytes;
+    if (source instanceof Binary) {
+      bytes = ((Binary) source).getBytes();
+    } else if (source instanceof byte[]) {
+      bytes = (byte[]) source;
+    } else {
+      throw new UnsupportedOperationException("Unable to create column data for type: " + type);
+    }
+    return bytes;
+  }
+
   public static class RowGroupInfo extends ReadEntryFromHDFS implements CompleteWork, FileWork {
 
     private EndpointByteMap byteMap;
@@ -684,6 +819,7 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
     if (formatConfig.areCorruptDatesAutoCorrected()) {
       ParquetReaderUtility.correctDatesInMetadataCache(this.parquetTableMetadata);
     }
+    ParquetReaderUtility.correctBinaryInMetadataCache(parquetTableMetadata);
     List<FileStatus> fileStatuses = selection.getStatuses(fs);
 
     if (fileSet == null) {
@@ -857,7 +993,7 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
               columnValueCounts.put(schemaPath, GroupScan.NO_COLUMN_STATS);
             }
           }
-          boolean partitionColumn = checkForPartitionColumn(column, first);
+          boolean partitionColumn = checkForPartitionColumn(column, first, rowCount);
           if (partitionColumn) {
             Map<SchemaPath, Object> map = partitionValueMap.get(file.getPath());
             if (map == null) {
@@ -871,7 +1007,13 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
                 partitionColTypeMap.remove(schemaPath);
               }
             } else {
-              map.put(schemaPath, currentValue);
+              // the value of a column with primitive type can not be null,
+              // so checks that there are really null value and puts it to the map
+              if (rowCount == column.getNulls()) {
+                map.put(schemaPath, null);
+              } else {
+                map.put(schemaPath, currentValue);
+              }
             }
           } else {
             partitionColTypeMap.remove(schemaPath);

http://git-wip-us.apache.org/repos/asf/drill/blob/5df49ab9/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java
index 84e969a..c2580cb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java
@@ -17,6 +17,8 @@
  */
 package org.apache.drill.exec.store.parquet;
 
+import com.google.common.collect.Sets;
+import org.apache.commons.codec.binary.Base64;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.PathSegment;
 import org.apache.drill.common.expression.SchemaPath;
@@ -38,6 +40,7 @@ import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
 import org.apache.parquet.hadoop.metadata.ColumnPath;
 import org.apache.parquet.hadoop.metadata.ParquetMetadata;
 import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
 import org.joda.time.Chronology;
 import org.joda.time.DateTimeConstants;
 import org.apache.parquet.example.data.simple.NanoTime;
@@ -48,6 +51,7 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 /**
  * Utility class where we can capture common logic between the two parquet readers
@@ -157,11 +161,12 @@ public class ParquetReaderUtility {
         // Drill has only ever written a single row group per file, only need to correct the statistics
         // on the first row group
         Metadata.RowGroupMetadata rowGroupMetadata = file.getRowGroups().get(0);
+        Long rowCount = rowGroupMetadata.getRowCount();
         for (Metadata.ColumnMetadata columnMetadata : rowGroupMetadata.getColumns()) {
           // Setting Min/Max values for ParquetTableMetadata_v1
           if (new MetadataVersion(1, 0).equals(new MetadataVersion(parquetTableMetadata.getMetadataVersion()))) {
             OriginalType originalType = columnMetadata.getOriginalType();
-            if (OriginalType.DATE.equals(originalType) && columnMetadata.hasSingleValue() &&
+            if (OriginalType.DATE.equals(originalType) && columnMetadata.hasSingleValue(rowCount) &&
                 (Integer) columnMetadata.getMaxValue() > ParquetReaderUtility.DATE_CORRUPTION_THRESHOLD) {
               int newMinMax = ParquetReaderUtility.autoCorrectCorruptedDate((Integer) columnMetadata.getMaxValue());
               columnMetadata.setMax(newMinMax);
@@ -171,7 +176,7 @@ public class ParquetReaderUtility {
           // Setting Max values for ParquetTableMetadata_v2
           else if (new MetadataVersion(2, 0).equals(new MetadataVersion(parquetTableMetadata.getMetadataVersion())) &&
               columnMetadata.getName() != null && Arrays.equals(columnMetadata.getName(), names) &&
-              columnMetadata.hasSingleValue() && (Integer) columnMetadata.getMaxValue() >
+              columnMetadata.hasSingleValue(rowCount) && (Integer) columnMetadata.getMaxValue() >
               ParquetReaderUtility.DATE_CORRUPTION_THRESHOLD) {
             int newMax = ParquetReaderUtility.autoCorrectCorruptedDate((Integer) columnMetadata.getMaxValue());
             columnMetadata.setMax(newMax);
@@ -182,6 +187,109 @@ public class ParquetReaderUtility {
   }
 
   /**
+   * Checks assigns byte arrays to min/max values obtained from the deserialized string
+   * for BINARY.
+   *
+   * @param parquetTableMetadata table metadata that should be corrected
+   */
+  public static void correctBinaryInMetadataCache(Metadata.ParquetTableMetadataBase parquetTableMetadata) {
+    // Looking for the names of the columns with BINARY data type
+    // in the metadata cache file for V2 and all v3 versions
+    Set<List<String>> columnsNames = getBinaryColumnsNames(parquetTableMetadata);
+
+    for (Metadata.ParquetFileMetadata file : parquetTableMetadata.getFiles()) {
+      for (Metadata.RowGroupMetadata rowGroupMetadata : file.getRowGroups()) {
+        Long rowCount = rowGroupMetadata.getRowCount();
+        for (Metadata.ColumnMetadata columnMetadata : rowGroupMetadata.getColumns()) {
+          // Setting Min/Max values for ParquetTableMetadata_v1
+          if (new MetadataVersion(1, 0).equals(new MetadataVersion(parquetTableMetadata.getMetadataVersion()))) {
+            if (columnMetadata.getPrimitiveType() == PrimitiveTypeName.BINARY
+                || columnMetadata.getPrimitiveType() == PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) {
+              setMinMaxValues(columnMetadata, rowCount);
+            }
+          }
+          // Setting Min/Max values for V2 and all V3 versions before V3_3
+          else if (new MetadataVersion(parquetTableMetadata.getMetadataVersion()).compareTo(new MetadataVersion(3, 3)) < 0
+                    && columnsNames.contains(Arrays.asList(columnMetadata.getName()))) {
+            setMinMaxValues(columnMetadata, rowCount);
+          }
+          // Setting Min/Max values for V3_3 and all younger versions
+          else if (new MetadataVersion(parquetTableMetadata.getMetadataVersion()).compareTo(new MetadataVersion(3, 3)) >= 0
+                      && columnsNames.contains(Arrays.asList(columnMetadata.getName()))) {
+            convertMinMaxValues(columnMetadata, rowCount);
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * Returns the set of the lists with names of the columns with BINARY or
+   * FIXED_LEN_BYTE_ARRAY data type from {@code ParquetTableMetadataBase columnTypeMetadataCollection}
+   * if parquetTableMetadata has version v2 or v3 (including minor versions).
+   *
+   * @param parquetTableMetadata table metadata the source of the columns to check
+   * @return set of the lists with column names
+   */
+  private static Set<List<String>> getBinaryColumnsNames(Metadata.ParquetTableMetadataBase parquetTableMetadata) {
+    Set<List<String>> names = Sets.newHashSet();
+    if (parquetTableMetadata instanceof Metadata.ParquetTableMetadata_v2) {
+      for (Metadata.ColumnTypeMetadata_v2 columnTypeMetadata :
+        ((Metadata.ParquetTableMetadata_v2) parquetTableMetadata).columnTypeInfo.values()) {
+        if (columnTypeMetadata.primitiveType == PrimitiveTypeName.BINARY
+            || columnTypeMetadata.primitiveType == PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) {
+          names.add(Arrays.asList(columnTypeMetadata.name));
+        }
+      }
+    } else if (parquetTableMetadata instanceof Metadata.ParquetTableMetadata_v3) {
+      for (Metadata.ColumnTypeMetadata_v3 columnTypeMetadata :
+        ((Metadata.ParquetTableMetadata_v3) parquetTableMetadata).columnTypeInfo.values()) {
+        if (columnTypeMetadata.primitiveType == PrimitiveTypeName.BINARY
+            || columnTypeMetadata.primitiveType == PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) {
+          names.add(Arrays.asList(columnTypeMetadata.name));
+        }
+      }
+    }
+    return names;
+  }
+
+  /**
+   * Checks that column has single value and replaces Min and Max by their byte values
+   * in {@code Metadata.ColumnMetadata columnMetadata} if their values were stored as strings.
+   *
+   * @param columnMetadata column metadata that should be changed
+   * @param rowCount       rows count in column chunk
+   */
+  private static void setMinMaxValues(Metadata.ColumnMetadata columnMetadata, long rowCount) {
+    if (columnMetadata.hasSingleValue(rowCount)) {
+      Object minValue = columnMetadata.getMinValue();
+      if (minValue != null && minValue instanceof String) {
+        byte[] bytes = ((String) minValue).getBytes();
+        columnMetadata.setMax(bytes);
+        columnMetadata.setMin(bytes);
+      }
+    }
+  }
+
+  /**
+   * Checks that column has single value and replaces Min and Max by their byte values from Base64 data
+   * in Metadata.ColumnMetadata columnMetadata if their values were stored as strings.
+   *
+   * @param columnMetadata column metadata that should be changed
+   * @param rowCount       rows count in column chunk
+   */
+  private static void convertMinMaxValues(Metadata.ColumnMetadata columnMetadata, long rowCount) {
+    if (columnMetadata.hasSingleValue(rowCount)) {
+      Object minValue = columnMetadata.getMinValue();
+      if (minValue != null && minValue instanceof String) {
+        byte[] bytes = Base64.decodeBase64(((String) minValue).getBytes());
+        columnMetadata.setMax(bytes);
+        columnMetadata.setMin(bytes);
+      }
+    }
+  }
+
+  /**
    * Check for corrupted dates in a parquet file. See Drill-4203
    */
   public static DateCorruptionStatus detectCorruptDates(ParquetMetadata footer,

http://git-wip-us.apache.org/repos/asf/drill/blob/5df49ab9/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/stat/ParquetMetaStatCollector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/stat/ParquetMetaStatCollector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/stat/ParquetMetaStatCollector.java
index d86f863..4501cb8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/stat/ParquetMetaStatCollector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/stat/ParquetMetaStatCollector.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -95,8 +95,17 @@ public class ParquetMetaStatCollector implements  ColumnStatCollector{
         primitiveType = this.parquetTableMetadata.getPrimitiveType(columnMetadata.getName());
         originalType = this.parquetTableMetadata.getOriginalType(columnMetadata.getName());
         final Integer repetitionLevel = this.parquetTableMetadata.getRepetitionLevel(columnMetadata.getName());
+        int precision = 0;
+        int scale = 0;
+        // ColumnTypeMetadata_v3 stores information about scale and precision
+        if (parquetTableMetadata instanceof Metadata.ParquetTableMetadata_v3) {
+          Metadata.ColumnTypeMetadata_v3 columnTypeInfo = ((Metadata.ParquetTableMetadata_v3) parquetTableMetadata)
+                                                                          .getColumnTypeInfo(columnMetadata.getName());
+          scale = columnTypeInfo.scale;
+          precision = columnTypeInfo.precision;
+        }
 
-        statMap.put(schemaPath, getStat(min, max, numNull, primitiveType, originalType, repetitionLevel));
+        statMap.put(schemaPath, getStat(min, max, numNull, primitiveType, originalType, repetitionLevel, scale, precision));
       } else {
         final String columnName = schemaPath.getRootSegment().getPath();
         if (implicitColValues.containsKey(columnName)) {
@@ -117,20 +126,35 @@ public class ParquetMetaStatCollector implements  ColumnStatCollector{
     return statMap;
   }
 
+  /**
+   * Builds column statistics using given primitiveType, originalType, repetitionLevel, scale,
+   * precision, numNull, min and max values.
+   *
+   * @param min             min value for statistics
+   * @param max             max value for statistics
+   * @param numNull         num_nulls for statistics
+   * @param primitiveType   type that determines statistics class
+   * @param originalType    type that determines statistics class
+   * @param repetitionLevel field repetition level
+   * @param scale           scale value (used for DECIMAL type)
+   * @param precision       precision value (used for DECIMAL type)
+   * @return column statistics
+   */
   private ColumnStatistics getStat(Object min, Object max, Long numNull,
-      PrimitiveType.PrimitiveTypeName primitiveType, OriginalType originalType, Integer repetitionLevel) {
+                                   PrimitiveType.PrimitiveTypeName primitiveType, OriginalType originalType,
+                                   Integer repetitionLevel, int scale, int precision) {
     Statistics stat = Statistics.getStatsBasedOnType(primitiveType);
     Statistics convertedStat = stat;
 
-    TypeProtos.MajorType type = ParquetGroupScan.getType(primitiveType, originalType);
+    TypeProtos.MajorType type = ParquetGroupScan.getType(primitiveType, originalType, scale, precision);
 
     // Change to repeated if repetitionLevel > 0
     if (repetitionLevel != null && repetitionLevel > 0) {
-      type = TypeProtos.MajorType.newBuilder().setMinorType(type.getMinorType()).setMode(TypeProtos.DataMode.REPEATED).build();
+      type = Types.withScaleAndPrecision(type.getMinorType(), TypeProtos.DataMode.REPEATED, scale, precision);
     }
 
     if (numNull != null) {
-      stat.setNumNulls(numNull.longValue());
+      stat.setNumNulls(numNull);
     }
 
     if (min != null && max != null ) {

http://git-wip-us.apache.org/repos/asf/drill/blob/5df49ab9/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetMetadataCache.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetMetadataCache.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetMetadataCache.java
index 7578476..ca3328e 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetMetadataCache.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetMetadataCache.java
@@ -21,10 +21,12 @@ import com.google.common.io.Resources;
 import mockit.Mock;
 import mockit.MockUp;
 import mockit.integration.junit4.JMockit;
+import com.google.common.collect.Lists;
 import org.apache.drill.PlanTestBase;
 import org.apache.drill.common.util.TestTools;
 import org.apache.commons.io.FileUtils;
 import org.apache.drill.exec.store.dfs.MetadataContext;
+import org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.apache.hadoop.fs.Path;
 import org.junit.AfterClass;
 import org.junit.Assert;
@@ -714,6 +716,220 @@ public class TestParquetMetadataCache extends PlanTestBase {
     }
   }
 
+  @Test // DRILL-4139
+  public void testBooleanPartitionPruning() throws Exception {
+    final String boolPartitionTable = "dfs_test.tmp.`interval_bool_partition`";
+    try {
+      test("create table %s partition by (col_bln) as " +
+        "select * from cp.`parquet/alltypes_required.parquet`", boolPartitionTable);
+
+      String query = String.format("select * from %s where col_bln = true", boolPartitionTable);
+      int expectedRowCount = 2;
+
+      int actualRowCount = testSql(query);
+      assertEquals("Row count does not match the expected value", expectedRowCount, actualRowCount);
+      PlanTestBase.testPlanMatchingPatterns(query, new String[]{"usedMetadataFile=false"}, new String[]{"Filter"});
+
+      test("refresh table metadata %s", boolPartitionTable);
+
+      actualRowCount = testSql(query);
+      assertEquals("Row count does not match the expected value", expectedRowCount, actualRowCount);
+      PlanTestBase.testPlanMatchingPatterns(query, new String[]{"usedMetadataFile=true"}, new String[]{"Filter"});
+    } finally {
+      test("drop table if exists %s", boolPartitionTable);
+    }
+  }
+
+  @Test // DRILL-4139
+  public void testIntervalDayPartitionPruning() throws Exception {
+    final String intervalDayPartitionTable = "dfs_test.tmp.`interval_day_partition`";
+    try {
+      test("create table %s partition by (col_intrvl_day) as " +
+        "select * from cp.`parquet/alltypes_optional.parquet`", intervalDayPartitionTable);
+
+      String query = String.format("select * from %s " +
+        "where col_intrvl_day = cast('P26DT27386S' as interval day)", intervalDayPartitionTable);
+      int expectedRowCount = 1;
+
+      int actualRowCount = testSql(query);
+      assertEquals("Row count does not match the expected value", expectedRowCount, actualRowCount);
+      PlanTestBase.testPlanMatchingPatterns(query, new String[]{"usedMetadataFile=false"}, new String[]{"Filter"});
+
+      test("refresh table metadata %s", intervalDayPartitionTable);
+
+      actualRowCount = testSql(query);
+      assertEquals("Row count does not match the expected value", expectedRowCount, actualRowCount);
+      PlanTestBase.testPlanMatchingPatterns(query, new String[]{"usedMetadataFile=true"}, new String[]{"Filter"});
+    } finally {
+      test(String.format("drop table if exists %s", intervalDayPartitionTable));
+    }
+  }
+
+  @Test // DRILL-4139
+  public void testIntervalYearPartitionPruning() throws Exception {
+    final String intervalYearPartitionTable = "dfs_test.tmp.`interval_yr_partition`";
+    try {
+      test("create table %s partition by (col_intrvl_yr) as " +
+        "select * from cp.`parquet/alltypes_optional.parquet`", intervalYearPartitionTable);
+
+      String query = String.format("select * from %s where col_intrvl_yr = cast('P314M' as interval year)",
+        intervalYearPartitionTable);
+      int expectedRowCount = 1;
+
+      int actualRowCount = testSql(query);
+      assertEquals("Row count does not match the expected value", expectedRowCount, actualRowCount);
+      PlanTestBase.testPlanMatchingPatterns(query, new String[]{"usedMetadataFile=false"}, new String[]{"Filter"});
+
+      test("refresh table metadata %s", intervalYearPartitionTable);
+
+      actualRowCount = testSql(query);
+      assertEquals("Row count does not match the expected value", expectedRowCount, actualRowCount);
+      PlanTestBase.testPlanMatchingPatterns(query, new String[]{"usedMetadataFile=true"}, new String[]{"Filter"});
+    } finally {
+      test("drop table if exists %s", intervalYearPartitionTable);
+    }
+  }
+
+  @Test // DRILL-4139
+  public void testVarCharWithNullsPartitionPruning() throws Exception {
+    final String intervalYearPartitionTable = "dfs_test.tmp.`varchar_optional_partition`";
+    try {
+      test("create table %s partition by (col_vrchr) as " +
+        "select * from cp.`parquet/alltypes_optional.parquet`", intervalYearPartitionTable);
+
+      String query = String.format("select * from %s where col_vrchr = 'Nancy Cloke'",
+        intervalYearPartitionTable);
+      int expectedRowCount = 1;
+
+      int actualRowCount = testSql(query);
+      assertEquals("Row count does not match the expected value", expectedRowCount, actualRowCount);
+      PlanTestBase.testPlanMatchingPatterns(query, new String[]{"usedMetadataFile=false"}, new String[]{"Filter"});
+
+      test("refresh table metadata %s", intervalYearPartitionTable);
+
+      actualRowCount = testSql(query);
+      assertEquals("Row count does not match the expected value", expectedRowCount, actualRowCount);
+      PlanTestBase.testPlanMatchingPatterns(query, new String[]{"usedMetadataFile=true"}, new String[]{"Filter"});
+    } finally {
+      test("drop table if exists %s", intervalYearPartitionTable);
+    }
+  }
+
+  @Test // DRILL-4139
+  public void testDecimalPartitionPruning() throws Exception {
+    List<String> ctasQueries = Lists.newArrayList();
+    // decimal stores as fixed_len_byte_array
+    ctasQueries.add("create table %s partition by (manager_id) as " +
+      "select * from cp.`parquet/fixedlenDecimal.parquet`");
+    // decimal stores as int32
+    ctasQueries.add("create table %s partition by (manager_id) as " +
+      "select cast(manager_id as decimal(6, 0)) as manager_id, EMPLOYEE_ID, FIRST_NAME, LAST_NAME " +
+      "from cp.`parquet/fixedlenDecimal.parquet`");
+    // decimal stores as int64
+    ctasQueries.add("create table %s partition by (manager_id) as " +
+      "select cast(manager_id as decimal(18, 6)) as manager_id, EMPLOYEE_ID, FIRST_NAME, LAST_NAME " +
+      "from cp.`parquet/fixedlenDecimal.parquet`");
+    final String decimalPartitionTable = "dfs_test.tmp.`decimal_optional_partition`";
+    for (String ctasQuery : ctasQueries) {
+      try {
+        test("alter session set `%s` = true", PlannerSettings.ENABLE_DECIMAL_DATA_TYPE_KEY);
+        test(ctasQuery, decimalPartitionTable);
+
+        String query = String.format("select * from %s where manager_id = 148", decimalPartitionTable);
+        int expectedRowCount = 6;
+
+        int actualRowCount = testSql(query);
+        assertEquals("Row count does not match the expected value", expectedRowCount, actualRowCount);
+        PlanTestBase.testPlanMatchingPatterns(query, new String[]{"usedMetadataFile=false"}, new String[]{"Filter"});
+
+        test("refresh table metadata %s", decimalPartitionTable);
+
+        actualRowCount = testSql(query);
+        assertEquals("Row count does not match the expected value", expectedRowCount, actualRowCount);
+        PlanTestBase.testPlanMatchingPatterns(query, new String[]{"usedMetadataFile=true"}, new String[]{"Filter"});
+      } finally {
+        test("drop table if exists %s", decimalPartitionTable);
+        test("alter session set `%s` = false", PlannerSettings.ENABLE_DECIMAL_DATA_TYPE_KEY);
+      }
+    }
+  }
+
+  @Test // DRILL-4139
+  public void testIntWithNullsPartitionPruning() throws Exception {
+    try {
+      test("create table dfs_test.tmp.`t5/a` as\n" +
+        "select 100 as mykey from cp.`tpch/nation.parquet`\n" +
+        "union all\n" +
+        "select col_notexist from cp.`tpch/region.parquet`");
+
+      test("create table dfs_test.tmp.`t5/b` as\n" +
+        "select 200 as mykey from cp.`tpch/nation.parquet`\n" +
+        "union all\n" +
+        "select col_notexist from cp.`tpch/region.parquet`");
+
+      String query = "select mykey from dfs_test.tmp.`t5` where mykey = 100";
+      int actualRowCount = testSql(query);
+      assertEquals("Row count does not match the expected value", 25, actualRowCount);
+
+      test("refresh table metadata dfs_test.tmp.`t5`");
+
+      actualRowCount = testSql(query);
+      assertEquals("Row count does not match the expected value", 25, actualRowCount);
+    } finally {
+      test("drop table if exists dfs_test.tmp.`t5`");
+    }
+  }
+
+  @Test // DRILL-4139
+  public void testPartitionPruningWithIsNull() throws Exception {
+    try {
+      test("create table dfs_test.tmp.`t6/a` as\n" +
+        "select col_notexist as mykey from cp.`tpch/region.parquet`");
+
+      test("create table dfs_test.tmp.`t6/b` as\n" +
+        "select 100 as mykey from cp.`tpch/region.parquet`");
+
+      String query = "select mykey from dfs_test.tmp.t6 where mykey is null";
+
+      int actualRowCount = testSql(query);
+      assertEquals("Row count does not match the expected value", 5, actualRowCount);
+      PlanTestBase.testPlanMatchingPatterns(query, new String[]{"usedMetadataFile=false"}, new String[]{"Filter"});
+
+      test("refresh table metadata dfs_test.tmp.`t6`");
+
+      actualRowCount = testSql(query);
+      assertEquals("Row count does not match the expected value", 5, actualRowCount);
+      PlanTestBase.testPlanMatchingPatterns(query, new String[]{"usedMetadataFile=true"}, new String[]{"Filter"});
+    } finally {
+      test("drop table if exists dfs_test.tmp.`t6`");
+    }
+  }
+
+  @Test // DRILL-4139
+  public void testPartitionPruningWithIsNotNull() throws Exception {
+    try {
+      test("create table dfs_test.tmp.`t7/a` as\n" +
+        "select col_notexist as mykey from cp.`tpch/region.parquet`");
+
+      test("create table dfs_test.tmp.`t7/b` as\n" +
+        "select 100 as mykey from cp.`tpch/region.parquet`");
+
+      String query = "select mykey from dfs_test.tmp.t7 where mykey is null";
+
+      int actualRowCount = testSql(query);
+      assertEquals("Row count does not match the expected value", 5, actualRowCount);
+      PlanTestBase.testPlanMatchingPatterns(query, new String[]{"usedMetadataFile=false"}, new String[]{"Filter"});
+
+      test("refresh table metadata dfs_test.tmp.`t7`");
+
+      actualRowCount = testSql(query);
+      assertEquals("Row count does not match the expected value", 5, actualRowCount);
+      PlanTestBase.testPlanMatchingPatterns(query, new String[]{"usedMetadataFile=true"}, new String[]{"Filter"});
+    } finally {
+      test("drop table if exists dfs_test.tmp.`t7`");
+    }
+  }
+
   /**
    * Helper method for checking the metadata file existence
    *
@@ -726,5 +942,4 @@ public class TestParquetMetadataCache extends PlanTestBase {
     assertTrue(String.format("There is no metadata cache file for the %s table", table),
         Files.exists(metaFile.toPath()));
   }
-
 }