You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ar...@apache.org on 2019/08/27 10:18:21 UTC

[drill] branch master updated (9c62bf1 -> 31a4199)

This is an automated email from the ASF dual-hosted git repository.

arina pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git.


    from 9c62bf1  DRILL-7350: Move RowSet related classes from test folder
     new ef93515  DRILL-7156: Support empty Parquet files creation
     new b44b712  DRILL-7356: Introduce session options for the Drill Metastore
     new ffab527  DRILL-7326: Support repeated lists for CTAS parquet format
     new 305f040  DRILL-7339: Iceberg commit upgrade and Metastore tests categorization
     new b9a61b0  DRILL-7222: Visualize estimated and actual row counts for a query
     new 31a4199  DRILL-7353: Wrong driver class is written to the java.sql.Driver

The 6 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../{JdbcTest.java => MetastoreTest.java}          |   4 +-
 .../codegen/templates/EventBasedRecordWriter.java  |  13 ++
 .../templates/ParquetOutputRecordWriter.java       | 107 +++++++++-
 .../java/org/apache/drill/exec/ExecConstants.java  |  65 ++++++
 .../exec/server/options/SystemOptionManager.java   |   9 +-
 .../exec/server/rest/profile/HtmlAttribute.java    |   2 +
 .../exec/server/rest/profile/OperatorWrapper.java  |   8 +-
 .../exec/server/rest/profile/ProfileWrapper.java   |   6 +
 .../exec/store/parquet/ParquetRecordWriter.java    | 231 +++++++++++++++++----
 .../java-exec/src/main/resources/drill-module.conf |  11 +-
 .../src/main/resources/rest/profile/profile.ftl    |  64 +++++-
 .../physical/impl/writer/TestParquetWriter.java    | 108 +++++++---
 .../impl/writer/TestParquetWriterEmptyFiles.java   | 123 +++++++++--
 .../resources/jsoninput/repeated_list_of_maps.json |   2 +
 exec/jdbc-all/pom.xml                              |  16 ++
 .../org/apache/drill/jdbc/ITTestShadedJar.java     |  28 ++-
 metastore/iceberg-metastore/pom.xml                |   2 +-
 .../drill/metastore/iceberg/IcebergBaseTest.java   |   3 +
 .../components/tables/TestBasicTablesRequests.java |   3 +
 .../tables/TestBasicTablesTransformer.java         |   3 +
 .../components/tables/TestMetastoreTableInfo.java  |   3 +
 .../tables/TestTableMetadataUnitConversion.java    |   3 +
 .../metastore/metadata/MetadataSerDeTest.java      |   3 +
 23 files changed, 706 insertions(+), 111 deletions(-)
 copy common/src/test/java/org/apache/drill/categories/{JdbcTest.java => MetastoreTest.java} (93%)
 create mode 100644 exec/java-exec/src/test/resources/jsoninput/repeated_list_of_maps.json


[drill] 03/06: DRILL-7326: Support repeated lists for CTAS parquet format

Posted by ar...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

arina pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit ffab527451e0a23eca96f38bce52c790553cc47e
Author: Igor Guzenko <ih...@gmail.com>
AuthorDate: Mon Aug 19 20:02:51 2019 +0300

    DRILL-7326: Support repeated lists for CTAS parquet format
    
    closes #1844
---
 .../codegen/templates/EventBasedRecordWriter.java  |  13 ++
 .../templates/ParquetOutputRecordWriter.java       | 107 +++++++++++++++-
 .../exec/store/parquet/ParquetRecordWriter.java    | 136 ++++++++++++++++++++-
 .../physical/impl/writer/TestParquetWriter.java    | 108 ++++++++++++----
 .../resources/jsoninput/repeated_list_of_maps.json |   2 +
 5 files changed, 333 insertions(+), 33 deletions(-)

diff --git a/exec/java-exec/src/main/codegen/templates/EventBasedRecordWriter.java b/exec/java-exec/src/main/codegen/templates/EventBasedRecordWriter.java
index 7357243..d87eeb3 100644
--- a/exec/java-exec/src/main/codegen/templates/EventBasedRecordWriter.java
+++ b/exec/java-exec/src/main/codegen/templates/EventBasedRecordWriter.java
@@ -118,6 +118,19 @@ public class EventBasedRecordWriter {
     }
 
     public abstract void writeField() throws IOException;
+
+    /**
+     * Used by repeated converters for writing Parquet logical lists.
+     *
+     * @throws IOException may be thrown by subsequent invocation of {{@link #writeField()}}
+     *         in overriden methods
+     * @see <a href="https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists">Lists</a>
+     */
+    public void writeListField() throws IOException {
+      throw new UnsupportedOperationException(String.format(
+          "Converter '%s' doesn't support writing list fields.",
+          getClass().getSimpleName()));
+    }
   }
 
   public static FieldConverter getConverter(RecordWriter recordWriter, int fieldId, String fieldName, FieldReader reader) {
diff --git a/exec/java-exec/src/main/codegen/templates/ParquetOutputRecordWriter.java b/exec/java-exec/src/main/codegen/templates/ParquetOutputRecordWriter.java
index ff80701..1da206d 100644
--- a/exec/java-exec/src/main/codegen/templates/ParquetOutputRecordWriter.java
+++ b/exec/java-exec/src/main/codegen/templates/ParquetOutputRecordWriter.java
@@ -71,6 +71,10 @@ import java.util.Map;
  */
 public abstract class ParquetOutputRecordWriter extends AbstractRecordWriter implements RecordWriter {
 
+  protected static final String LIST = "list";
+  protected static final String ELEMENT = "element";
+  protected static final int ZERO_IDX = 0;
+
   private RecordConsumer consumer;
   private MessageType schema;
 
@@ -206,9 +210,9 @@ public abstract class ParquetOutputRecordWriter extends AbstractRecordWriter imp
               holder.buffer, holder.start, ${minor.class}Holder.nDecimalDigits, holder.scale).unscaledValue().toByteArray();
       byte[] output = new byte[ParquetTypeHelper.getLengthForMinorType(MinorType.${minor.class?upper_case})];
       if (holder.getSign(holder.start, holder.buffer)) {
-        Arrays.fill(output, 0, output.length - bytes.length, (byte)0xFF);
+        Arrays.fill(output, 0, output.length - bytes.length, (byte) -1);
       } else {
-        Arrays.fill(output, 0, output.length - bytes.length, (byte)0x0);
+        Arrays.fill(output, 0, output.length - bytes.length, (byte) 0);
       }
       System.arraycopy(bytes, 0, output, output.length - bytes.length, bytes.length);
       consumer.addBinary(Binary.fromByteArray(output));
@@ -268,10 +272,109 @@ public abstract class ParquetOutputRecordWriter extends AbstractRecordWriter imp
     consumer.endField(fieldName, fieldId);
   </#if>
     }
+
+    <#if mode.prefix == "Repeated">
+     @Override
+     public void writeListField() {
+      if (reader.size() == 0) {
+        return;
+      }
+      consumer.startField(LIST, ZERO_IDX);
+      for (int i = 0; i < reader.size(); i++) {
+        consumer.startGroup();
+        consumer.startField(ELEMENT, ZERO_IDX);
+
+  <#if minor.class == "TinyInt" ||
+       minor.class == "UInt1" ||
+       minor.class == "UInt2" ||
+       minor.class == "SmallInt" ||
+       minor.class == "Int" ||
+       minor.class == "Time" ||
+       minor.class == "Decimal9" ||
+       minor.class == "UInt4">
+        reader.read(i, holder);
+        consumer.addInteger(holder.value);
+  <#elseif minor.class == "Float4">
+        reader.read(i, holder);
+        consumer.addFloat(holder.value);
+  <#elseif minor.class == "BigInt" ||
+            minor.class == "Decimal18" ||
+            minor.class == "TimeStamp" ||
+            minor.class == "UInt8">
+        reader.read(i, holder);
+        consumer.addLong(holder.value);
+  <#elseif minor.class == "Date">
+        reader.read(i, holder);
+        consumer.addInteger((int) (holder.value / DateTimeConstants.MILLIS_PER_DAY));
+  <#elseif minor.class == "Float8">
+        reader.read(i, holder);
+        consumer.addDouble(holder.value);
+  <#elseif minor.class == "Bit">
+        reader.read(i, holder);
+        consumer.addBoolean(holder.value == 1);
+  <#elseif minor.class == "Decimal28Sparse" ||
+            minor.class == "Decimal38Sparse">
+      <#if mode.prefix == "Repeated" >
+      <#else>
+        consumer.startField(fieldName, fieldId);
+        reader.read(holder);
+        byte[] bytes = DecimalUtility.getBigDecimalFromSparse(
+            holder.buffer, holder.start, ${minor.class}Holder.nDecimalDigits, holder.scale).unscaledValue().toByteArray();
+        byte[] output = new byte[ParquetTypeHelper.getLengthForMinorType(MinorType.${minor.class?upper_case})];
+        if (holder.getSign(holder.start, holder.buffer)) {
+          Arrays.fill(output, 0, output.length - bytes.length, (byte) -1);
+        } else {
+          Arrays.fill(output, 0, output.length - bytes.length, (byte) 0);
+        }
+        System.arraycopy(bytes, 0, output, output.length - bytes.length, bytes.length);
+        consumer.addBinary(Binary.fromByteArray(output));
+        consumer.endField(fieldName, fieldId);
+      </#if>
+  <#elseif minor.class?contains("Interval")>
+        consumer.startField(fieldName, fieldId);
+        reader.read(holder);
+      <#if minor.class == "IntervalDay">
+        Arrays.fill(output, 0, 4, (byte) 0);
+        IntervalUtility.intToLEByteArray(holder.days, output, 4);
+        IntervalUtility.intToLEByteArray(holder.milliseconds, output, 8);
+      <#elseif minor.class == "IntervalYear">
+        IntervalUtility.intToLEByteArray(holder.value, output, 0);
+        Arrays.fill(output, 4, 8, (byte) 0);
+        Arrays.fill(output, 8, 12, (byte) 0);
+      <#elseif minor.class == "Interval">
+        IntervalUtility.intToLEByteArray(holder.months, output, 0);
+        IntervalUtility.intToLEByteArray(holder.days, output, 4);
+        IntervalUtility.intToLEByteArray(holder.milliseconds, output, 8);
+      </#if>
+        consumer.addBinary(Binary.fromByteArray(output));
+        consumer.endField(fieldName, fieldId);
+
+  <#elseif
+        minor.class == "TimeTZ" ||
+            minor.class == "Decimal28Dense" ||
+            minor.class == "Decimal38Dense">
+  <#elseif minor.class == "VarChar" || minor.class == "Var16Char"
+            || minor.class == "VarBinary" || minor.class == "VarDecimal">
+        reader.read(i, holder);
+      <#if minor.class == "VarDecimal">
+        decimalValueWriter.writeValue(consumer, holder.buffer, holder.start, holder.end,
+            reader.getField().getPrecision());
+      <#else>
+        consumer.addBinary(Binary.fromByteBuffer(holder.buffer.nioBuffer(holder.start, holder.end - holder.start)));
+      </#if>
+  </#if>
+
+        consumer.endField(ELEMENT, ZERO_IDX);
+        consumer.endGroup();
+      }
+      consumer.endField(LIST, ZERO_IDX);
+     }
+    </#if>
   }
     </#list>
   </#list>
 </#list>
+
   private static class IntervalUtility {
     private static void intToLEByteArray(final int value, final byte[] output, final int outputIndex) {
       int shiftOrder = 0;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
index a9f7f14..999fdcf 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
@@ -25,6 +25,8 @@ import java.io.IOException;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.stream.Collectors;
 
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.expression.SchemaPath;
@@ -49,6 +51,7 @@ import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter;
 import org.apache.drill.exec.store.ParquetOutputRecordWriter;
 import org.apache.drill.exec.util.DecimalUtility;
 import org.apache.drill.exec.vector.BitVector;
+import org.apache.drill.exec.vector.complex.BaseRepeatedValueVector;
 import org.apache.drill.exec.vector.complex.reader.FieldReader;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -76,6 +79,7 @@ import org.apache.parquet.schema.Type;
 import org.apache.parquet.schema.Type.Repetition;
 
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.apache.parquet.schema.Types.ListBuilder;
 
 public class ParquetRecordWriter extends ParquetOutputRecordWriter {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetRecordWriter.class);
@@ -289,22 +293,92 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
     DataMode dataMode = field.getType().getMode();
     switch (minorType) {
       case MAP:
-        List<Type> types = Lists.newArrayList();
-        for (MaterializedField childField : field.getChildren()) {
-          types.add(getType(childField));
-        }
+        List<Type> types = getChildrenTypes(field);
         return new GroupType(dataMode == DataMode.REPEATED ? Repetition.REPEATED : Repetition.OPTIONAL, field.getName(), types);
       case LIST:
-        throw new UnsupportedOperationException("Unsupported type " + minorType);
+        MaterializedField elementField = getDataField(field);
+        ListBuilder<GroupType> listBuilder = org.apache.parquet.schema.Types
+            .list(dataMode == DataMode.OPTIONAL ? Repetition.OPTIONAL : Repetition.REQUIRED);
+        addElementType(listBuilder, elementField);
+        GroupType listType = listBuilder.named(field.getName());
+        return listType;
       case NULL:
         MaterializedField newField = field.withType(
-          TypeProtos.MajorType.newBuilder().setMinorType(MinorType.INT).setMode(DataMode.OPTIONAL).build());
+            TypeProtos.MajorType.newBuilder().setMinorType(MinorType.INT).setMode(DataMode.OPTIONAL).build());
         return getPrimitiveType(newField);
       default:
         return getPrimitiveType(field);
     }
   }
 
+  /**
+   * Helper method for conversion of map child
+   * fields.
+   *
+   * @param field map
+   * @return converted child fields
+   */
+  private List<Type> getChildrenTypes(MaterializedField field) {
+    return field.getChildren().stream()
+        .map(this::getType)
+        .collect(Collectors.toList());
+  }
+
+  /**
+   * For list or repeated type possible child fields are {@link BaseRepeatedValueVector#DATA_VECTOR_NAME}
+   * and {@link BaseRepeatedValueVector#OFFSETS_VECTOR_NAME}. This method used to find the data field.
+   *
+   * @param field parent repeated field
+   * @return child data field
+   */
+  private MaterializedField getDataField(MaterializedField field) {
+    return field.getChildren().stream()
+        .filter(child -> BaseRepeatedValueVector.DATA_VECTOR_NAME.equals(child.getName()))
+        .findAny()
+        .orElseThrow(() -> new NoSuchElementException(String.format(
+            "Failed to get elementField '%s' from list: %s",
+            BaseRepeatedValueVector.DATA_VECTOR_NAME, field.getChildren())));
+  }
+
+  /**
+   * Adds element type to {@code listBuilder} based on Drill's
+   * {@code elementField}.
+   *
+   * @param listBuilder  list schema builder
+   * @param elementField Drill's type of list elements
+   */
+  private void addElementType(ListBuilder<GroupType> listBuilder, MaterializedField elementField) {
+    if (elementField.getDataMode() == DataMode.REPEATED) {
+      ListBuilder<GroupType> inner = org.apache.parquet.schema.Types.requiredList();
+      if (elementField.getType().getMinorType() == MinorType.MAP) {
+        GroupType mapGroupType = new GroupType(Repetition.REQUIRED, ELEMENT, getChildrenTypes(elementField));
+        inner.element(mapGroupType);
+      } else {
+        MaterializedField child2 = getDataField(elementField);
+        addElementType(inner, child2);
+      }
+      listBuilder.setElementType(inner.named(ELEMENT));
+    } else {
+      Type element = getType(elementField);
+      // element may have internal name '$data$',
+      // rename it to 'element' according to Parquet list schema
+      if (element.isPrimitive()) {
+        PrimitiveType primitiveElement = element.asPrimitiveType();
+        element = new PrimitiveType(
+            primitiveElement.getRepetition(),
+            primitiveElement.getPrimitiveTypeName(),
+            ELEMENT,
+            primitiveElement.getOriginalType()
+        );
+      } else {
+        GroupType groupElement = element.asGroupType();
+        element = new GroupType(groupElement.getRepetition(),
+            ELEMENT, groupElement.getFields());
+      }
+      listBuilder.element(element);
+    }
+  }
+
   @Override
   public void checkForNewPartition(int index) {
     if (!hasPartitions) {
@@ -423,8 +497,58 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
       }
       consumer.endField(fieldName, fieldId);
     }
+
+    @Override
+    public void writeListField() throws IOException {
+      if (reader.size() == 0) {
+        return;
+      }
+      consumer.startField(LIST, ZERO_IDX);
+      while (reader.next()) {
+        consumer.startGroup();
+        consumer.startField(ELEMENT, ZERO_IDX);
+
+        consumer.startGroup();
+        for (FieldConverter converter : converters) {
+          converter.writeField();
+        }
+        consumer.endGroup();
+
+        consumer.endField(ELEMENT, ZERO_IDX);
+        consumer.endGroup();
+      }
+      consumer.endField(LIST, ZERO_IDX);
+    }
+  }
+
+  @Override
+  public FieldConverter getNewRepeatedListConverter(int fieldId, String fieldName, FieldReader reader) {
+    return new RepeatedListParquetConverter(fieldId, fieldName, reader);
   }
 
+  public class RepeatedListParquetConverter extends FieldConverter {
+    private final FieldConverter converter;
+
+    RepeatedListParquetConverter(int fieldId, String fieldName, FieldReader reader) {
+      super(fieldId, fieldName, reader);
+      converter = EventBasedRecordWriter.getConverter(ParquetRecordWriter.this, 0, "", reader.reader());
+    }
+
+    @Override
+    public void writeField() throws IOException {
+      consumer.startField(fieldName, fieldId);
+      consumer.startField(LIST, ZERO_IDX);
+      while (reader.next()) {
+        consumer.startGroup();
+        consumer.startField(ELEMENT, ZERO_IDX);
+        converter.writeListField();
+        consumer.endField(ELEMENT, ZERO_IDX);
+        consumer.endGroup();
+      }
+      consumer.endField(LIST, ZERO_IDX);
+      consumer.endField(fieldName, fieldId);
+    }
+  }
 
   @Override
   public void startRecord() throws IOException {
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
index 70fff7d..5fa618f 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
@@ -17,29 +17,6 @@
  */
 package org.apache.drill.exec.physical.impl.writer;
 
-import static org.apache.drill.exec.store.parquet.ParquetRecordWriter.DRILL_VERSION_PROPERTY;
-import static org.apache.drill.test.TestBuilder.convertToLocalDateTime;
-import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
-import static org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS;
-import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
-import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY;
-import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
-import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;
-import static org.junit.Assert.assertEquals;
-
-import java.io.File;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.math.BigDecimal;
-import java.nio.file.Paths;
-import java.time.LocalDate;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
 import org.apache.calcite.util.Pair;
 import org.apache.drill.categories.ParquetTest;
 import org.apache.drill.categories.SlowTest;
@@ -49,6 +26,8 @@ import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.fn.interp.TestConstantFolding;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.apache.drill.exec.util.JsonStringArrayList;
+import org.apache.drill.shaded.guava.com.google.common.base.Joiner;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
 import org.apache.drill.test.BaseTestQuery;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
@@ -68,8 +47,31 @@ import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
-import org.apache.drill.shaded.guava.com.google.common.base.Joiner;
-import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.file.Paths;
+import java.time.LocalDate;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static java.util.Arrays.asList;
+import static org.apache.drill.exec.store.parquet.ParquetRecordWriter.DRILL_VERSION_PROPERTY;
+import static org.apache.drill.exec.util.StoragePluginTestUtils.DFS_TMP_SCHEMA;
+import static org.apache.drill.test.TestBuilder.convertToLocalDateTime;
+import static org.apache.drill.test.TestBuilder.mapOf;
+import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
+import static org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;
+import static org.junit.Assert.assertEquals;
 
 @RunWith(Parameterized.class)
 @Category({SlowTest.class, ParquetTest.class})
@@ -1272,6 +1274,62 @@ public class TestParquetWriter extends BaseTestQuery {
     }
   }
 
+  @Test
+  public void testCtasForList() throws Exception {
+    String tableName = "testCtasForList";
+    try {
+      test("CREATE TABLE `%s`.`%s` AS SELECT l FROM cp.`jsoninput/input2.json`", DFS_TMP_SCHEMA, tableName);
+      testBuilder()
+          .sqlQuery("SELECT * FROM `%s`.`/%s` LIMIT 1", DFS_TMP_SCHEMA, tableName)
+          .unOrdered()
+          .baselineColumns("l")
+          .baselineValues(asList(4L, 2L))
+          .go();
+    } finally {
+      test("DROP TABLE IF EXISTS `%s`.`%s`", DFS_TMP_SCHEMA, tableName);
+    }
+  }
+
+  @Test
+  public void testCtasForRepeatedList() throws Exception {
+    String tableName = "testCtasForRepeatedList";
+    try {
+      test("CREATE TABLE `%s`.`%s` AS SELECT * FROM cp.`jsoninput/repeated_list_bug.json`", DFS_TMP_SCHEMA, tableName);
+      testBuilder()
+          .sqlQuery("SELECT rl FROM `%s`.`/%s`", DFS_TMP_SCHEMA, tableName)
+          .unOrdered()
+          .baselineColumns("rl")
+          .baselineValues(asList(asList(4L, 6L), asList(2L, 3L)))
+          .baselineValues(asList(asList(9L, 7L), asList(4L, 8L)))
+          .go();
+    } finally {
+      test("DROP TABLE IF EXISTS `%s`.`%s`", DFS_TMP_SCHEMA, tableName);
+    }
+  }
+
+  @Test
+  public void testCtasForRepeatedListOfMaps() throws Exception {
+    String tableName = "testCtasForRepeatedListOfMaps";
+    try {
+      test("CREATE TABLE `%s`.`%s` AS SELECT * FROM cp.`jsoninput/repeated_list_of_maps.json`", DFS_TMP_SCHEMA, tableName);
+      testBuilder()
+          .sqlQuery("SELECT * FROM `%s`.`/%s`", DFS_TMP_SCHEMA, tableName)
+          .unOrdered()
+          .baselineColumns("rma")
+          .baselineValues(asList(
+              asList(mapOf("a", 1L, "b", "2"), mapOf("a", 2L, "b", "3")),
+              asList(mapOf("a", 3L, "b", "3"))
+          ))
+          .baselineValues(asList(
+              asList(mapOf("a", 4L, "b", "4"), mapOf("a", 5L, "b", "5"), mapOf("a", 6L, "b", "6")),
+              asList(mapOf("a", 7L, "b", "7"))
+          ))
+          .go();
+    } finally {
+      test("DROP TABLE IF EXISTS `%s`.`%s`", DFS_TMP_SCHEMA, tableName);
+    }
+  }
+
   /**
    * Checks that specified parquet table contains specified columns with specified types.
    *
diff --git a/exec/java-exec/src/test/resources/jsoninput/repeated_list_of_maps.json b/exec/java-exec/src/test/resources/jsoninput/repeated_list_of_maps.json
new file mode 100644
index 0000000..842468f
--- /dev/null
+++ b/exec/java-exec/src/test/resources/jsoninput/repeated_list_of_maps.json
@@ -0,0 +1,2 @@
+{ "rma": [[{"a": 1, "b": "2"},{"a": 2, "b": "3"}], [{"a": 3, "b": "3"}]]}
+{ "rma": [[{"a": 4, "b": "4"},{"a": 5, "b": "5"},{"a": 6, "b": "6"}], [{"a": 7, "b": "7"}]]}
\ No newline at end of file


[drill] 02/06: DRILL-7356: Introduce session options for the Drill Metastore

Posted by ar...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

arina pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit b44b7127306f68c250beafb5a4e7980303fd5cf1
Author: Volodymyr Vysotskyi <vv...@gmail.com>
AuthorDate: Thu Aug 22 19:01:15 2019 +0300

    DRILL-7356: Introduce session options for the Drill Metastore
    
    closes #1846
---
 .../java/org/apache/drill/exec/ExecConstants.java  | 63 ++++++++++++++++++++++
 .../exec/server/options/SystemOptionManager.java   |  9 +++-
 .../java-exec/src/main/resources/drill-module.conf | 10 +++-
 3 files changed, 80 insertions(+), 2 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 6f3f17d..549d374 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -1044,4 +1044,67 @@ public final class ExecConstants {
   public static final RangeLongValidator QUERY_MAX_ROWS_VALIDATOR = new RangeLongValidator(QUERY_MAX_ROWS, 0, Integer.MAX_VALUE,
       new OptionDescription("The maximum number of rows that the query will return. This can be only set at a SYSTEM level by an admin. (Drill 1.16+)"));
 
+  /**
+   * Option that enables Drill Metastore usage.
+   */
+  public static final String METASTORE_ENABLED = "metastore.enabled";
+  public static final BooleanValidator METASTORE_ENABLED_VALIDATOR = new BooleanValidator(METASTORE_ENABLED,
+      new OptionDescription("Enables Drill Metastore usage to be able to store table metadata " +
+          "during ANALYZE TABLE commands execution and to be able to read table metadata during regular " +
+          "queries execution or when querying some INFORMATION_SCHEMA tables. " +
+          "This option is not active for now. Default is false. (Drill 1.17+)"));
+
+  /**
+   * Option for specifying maximum level depth for collecting metadata
+   * which will be stored in the Drill Metastore. For example, when {@code FILE} level value
+   * is set, {@code ROW_GROUP} level metadata won't be collected and stored into the Metastore.
+   */
+  public static final String METASTORE_METADATA_STORE_DEPTH_LEVEL = "metastore.metadata.store.depth_level";
+  public static final EnumeratedStringValidator METASTORE_METADATA_STORE_DEPTH_LEVEL_VALIDATOR = new EnumeratedStringValidator(METASTORE_METADATA_STORE_DEPTH_LEVEL,
+      new OptionDescription("Specifies maximum level depth for collecting metadata. " +
+          "This option is not active for now. Default is 'ROW_GROUP'. (Drill 1.17+)"),
+      "TABLE", "SEGMENT", "PARTITION", "FILE", "ROW_GROUP");
+
+  /**
+   * Option for enabling schema usage, stored to the Metastore.
+   */
+  public static final String METASTORE_USE_SCHEMA_METADATA = "metastore.metadata.use_schema";
+  public static final BooleanValidator METASTORE_USE_SCHEMA_METADATA_VALIDATOR = new BooleanValidator(METASTORE_USE_SCHEMA_METADATA,
+      new OptionDescription("Enables schema usage, stored to the Metastore. " +
+          "This option is not active for now. Default is false. (Drill 1.17+)"));
+
+  /**
+   * Option for enabling statistics usage, stored in the Metastore, at the planning stage.
+   */
+  public static final String METASTORE_USE_STATISTICS_METADATA = "metastore.metadata.use_statistics";
+  public static final BooleanValidator METASTORE_USE_STATISTICS_METADATA_VALIDATOR = new BooleanValidator(METASTORE_USE_STATISTICS_METADATA,
+      new OptionDescription("Enables statistics usage, stored in the Metastore, at the planning stage. " +
+          "This option is not active for now. Default is false. (Drill 1.17+)"));
+
+  /**
+   * Option for collecting schema and / or column statistics for every table after CTAS and CTTAS execution.
+   */
+  public static final String METASTORE_CTAS_AUTO_COLLECT_METADATA = "metastore.metadata.ctas.auto-collect";
+  public static final EnumeratedStringValidator METASTORE_CTAS_AUTO_COLLECT_METADATA_VALIDATOR = new EnumeratedStringValidator(METASTORE_CTAS_AUTO_COLLECT_METADATA,
+      new OptionDescription("Specifies whether schema and / or column statistics will be " +
+          "automatically collected for every table after CTAS and CTTAS. " +
+          "This option is not active for now. Default is 'NONE'. (Drill 1.17+)"),
+      "NONE", "ALL", "SCHEMA");
+
+  /**
+   * Option for allowing using file metadata cache if required metadata is absent in the Metastore.
+   */
+  public static final String METASTORE_FALLBACK_TO_FILE_METADATA = "metastore.metadata.fallback_to_file_metadata";
+  public static final BooleanValidator METASTORE_FALLBACK_TO_FILE_METADATA_VALIDATOR = new BooleanValidator(METASTORE_FALLBACK_TO_FILE_METADATA,
+      new OptionDescription("Allows using file metadata cache for the case when required metadata is absent in the Metastore. " +
+          "This option is not active for now. Default is true. (Drill 1.17+)"));
+
+  /**
+   * Option for specifying the number of attempts for retrying query planning after detecting that query metadata is changed.
+   */
+  public static final String METASTORE_RETRIVAL_RETRY_ATTEMPTS = "metastore.retrival.retry_attempts";
+  public static final IntegerValidator METASTORE_RETRIVAL_RETRY_ATTEMPTS_VALIDATOR = new IntegerValidator(METASTORE_RETRIVAL_RETRY_ATTEMPTS,
+      new OptionDescription("Specifies the number of attempts for retrying query planning after detecting that query metadata is changed. " +
+          "If the number of retries was exceeded, query will be planned without metadata information from the Metastore. " +
+          "This option is not active for now. Default is 5. (Drill 1.17+)"));
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index 88bba32..d4289fb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -287,7 +287,14 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea
       new OptionDefinition(ExecConstants.RM_QUERY_TAGS_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.SESSION_AND_QUERY, false, false)),
       new OptionDefinition(ExecConstants.RM_QUEUES_WAIT_FOR_PREFERRED_NODES_VALIDATOR),
       new OptionDefinition(ExecConstants.TDIGEST_COMPRESSION_VALIDATOR),
-      new OptionDefinition(ExecConstants.QUERY_MAX_ROWS_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.ALL, true, false))
+      new OptionDefinition(ExecConstants.QUERY_MAX_ROWS_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.ALL, true, false)),
+      new OptionDefinition(ExecConstants.METASTORE_ENABLED_VALIDATOR),
+      new OptionDefinition(ExecConstants.METASTORE_METADATA_STORE_DEPTH_LEVEL_VALIDATOR),
+      new OptionDefinition(ExecConstants.METASTORE_USE_SCHEMA_METADATA_VALIDATOR),
+      new OptionDefinition(ExecConstants.METASTORE_USE_STATISTICS_METADATA_VALIDATOR),
+      new OptionDefinition(ExecConstants.METASTORE_CTAS_AUTO_COLLECT_METADATA_VALIDATOR),
+      new OptionDefinition(ExecConstants.METASTORE_FALLBACK_TO_FILE_METADATA_VALIDATOR),
+      new OptionDefinition(ExecConstants.METASTORE_RETRIVAL_RETRY_ATTEMPTS_VALIDATOR)
     };
 
     CaseInsensitiveMap<OptionDefinition> map = Arrays.stream(definitions)
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index bb8f292..422d850 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -691,5 +691,13 @@ drill.exec.options: {
     # ========= rm related options ===========
     exec.rm.queryTags: "",
     exec.rm.queues.wait_for_preferred_nodes: true,
-    exec.statistics.tdigest_compression: 100
+    exec.statistics.tdigest_compression: 100,
+    # ========= Metastore related options ===========
+    metastore.enabled: false,
+    metastore.retrival.retry_attempts: 5
+    metastore.metadata.store.depth_level: "ROW_GROUP",
+    metastore.metadata.use_schema: false,
+    metastore.metadata.use_statistics: false,
+    metastore.metadata.ctas.auto-collect: "NONE",
+    metastore.metadata.fallback_to_file_metadata: true
 }


[drill] 04/06: DRILL-7339: Iceberg commit upgrade and Metastore tests categorization

Posted by ar...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

arina pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 305f040ae5c4a3851bf139e9ed4b4efb70b09dd8
Author: Arina Ielchiieva <ar...@gmail.com>
AuthorDate: Wed Aug 14 19:17:46 2019 +0300

    DRILL-7339: Iceberg commit upgrade and Metastore tests categorization
    
    1. Upgraded Iceberg commit to fix issue with deletes in transaction
    2. Categorize Metastore tests
    
    closes #1842
---
 .../org/apache/drill/categories/MetastoreTest.java | 24 ++++++++++++++++++++++
 metastore/iceberg-metastore/pom.xml                |  2 +-
 .../drill/metastore/iceberg/IcebergBaseTest.java   |  3 +++
 .../components/tables/TestBasicTablesRequests.java |  3 +++
 .../tables/TestBasicTablesTransformer.java         |  3 +++
 .../components/tables/TestMetastoreTableInfo.java  |  3 +++
 .../tables/TestTableMetadataUnitConversion.java    |  3 +++
 .../metastore/metadata/MetadataSerDeTest.java      |  3 +++
 8 files changed, 43 insertions(+), 1 deletion(-)

diff --git a/common/src/test/java/org/apache/drill/categories/MetastoreTest.java b/common/src/test/java/org/apache/drill/categories/MetastoreTest.java
new file mode 100644
index 0000000..9da1e03
--- /dev/null
+++ b/common/src/test/java/org/apache/drill/categories/MetastoreTest.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.categories;
+
+/**
+ * This is a category used to mark unit tests that test the Drill Metastore and its components.
+ */
+public interface MetastoreTest {
+}
diff --git a/metastore/iceberg-metastore/pom.xml b/metastore/iceberg-metastore/pom.xml
index d935750..41d4690 100644
--- a/metastore/iceberg-metastore/pom.xml
+++ b/metastore/iceberg-metastore/pom.xml
@@ -33,7 +33,7 @@
   <name>metastore/Drill Iceberg Metastore</name>
 
   <properties>
-    <iceberg.version>08e0873</iceberg.version>
+    <iceberg.version>1b0b9c2</iceberg.version>
     <caffeine.version>2.7.0</caffeine.version>
   </properties>
 
diff --git a/metastore/iceberg-metastore/src/test/java/org/apache/drill/metastore/iceberg/IcebergBaseTest.java b/metastore/iceberg-metastore/src/test/java/org/apache/drill/metastore/iceberg/IcebergBaseTest.java
index 0250ce9..5ef7b80 100644
--- a/metastore/iceberg-metastore/src/test/java/org/apache/drill/metastore/iceberg/IcebergBaseTest.java
+++ b/metastore/iceberg-metastore/src/test/java/org/apache/drill/metastore/iceberg/IcebergBaseTest.java
@@ -19,6 +19,7 @@ package org.apache.drill.metastore.iceberg;
 
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigValueFactory;
+import org.apache.drill.categories.MetastoreTest;
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.util.GuavaPatcher;
 import org.apache.drill.metastore.iceberg.config.IcebergConfigConstants;
@@ -28,11 +29,13 @@ import org.apache.hadoop.fs.Path;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Rule;
+import org.junit.experimental.categories.Category;
 import org.junit.rules.ExpectedException;
 import org.junit.rules.TemporaryFolder;
 
 import java.io.File;
 
+@Category(MetastoreTest.class)
 public abstract class IcebergBaseTest {
 
   @ClassRule
diff --git a/metastore/metastore-api/src/test/java/org/apache/drill/metastore/components/tables/TestBasicTablesRequests.java b/metastore/metastore-api/src/test/java/org/apache/drill/metastore/components/tables/TestBasicTablesRequests.java
index 4a8baf7..9c6c45c 100644
--- a/metastore/metastore-api/src/test/java/org/apache/drill/metastore/components/tables/TestBasicTablesRequests.java
+++ b/metastore/metastore-api/src/test/java/org/apache/drill/metastore/components/tables/TestBasicTablesRequests.java
@@ -17,8 +17,10 @@
  */
 package org.apache.drill.metastore.components.tables;
 
+import org.apache.drill.categories.MetastoreTest;
 import org.apache.drill.metastore.expressions.FilterExpression;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 
 import java.util.Arrays;
 import java.util.Collections;
@@ -28,6 +30,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
+@Category(MetastoreTest.class)
 public class TestBasicTablesRequests {
 
   @Test
diff --git a/metastore/metastore-api/src/test/java/org/apache/drill/metastore/components/tables/TestBasicTablesTransformer.java b/metastore/metastore-api/src/test/java/org/apache/drill/metastore/components/tables/TestBasicTablesTransformer.java
index 84002ff..1bd9f6a 100644
--- a/metastore/metastore-api/src/test/java/org/apache/drill/metastore/components/tables/TestBasicTablesTransformer.java
+++ b/metastore/metastore-api/src/test/java/org/apache/drill/metastore/components/tables/TestBasicTablesTransformer.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.metastore.components.tables;
 
+import org.apache.drill.categories.MetastoreTest;
 import org.apache.drill.metastore.metadata.BaseTableMetadata;
 import org.apache.drill.metastore.metadata.FileMetadata;
 import org.apache.drill.metastore.metadata.MetadataInfo;
@@ -25,6 +26,7 @@ import org.apache.drill.metastore.metadata.PartitionMetadata;
 import org.apache.drill.metastore.metadata.RowGroupMetadata;
 import org.apache.drill.metastore.metadata.SegmentMetadata;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 
 import java.util.Arrays;
 import java.util.Collections;
@@ -33,6 +35,7 @@ import java.util.List;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
+@Category(MetastoreTest.class)
 public class TestBasicTablesTransformer {
 
   @Test
diff --git a/metastore/metastore-api/src/test/java/org/apache/drill/metastore/components/tables/TestMetastoreTableInfo.java b/metastore/metastore-api/src/test/java/org/apache/drill/metastore/components/tables/TestMetastoreTableInfo.java
index bc80c66..90ce610 100644
--- a/metastore/metastore-api/src/test/java/org/apache/drill/metastore/components/tables/TestMetastoreTableInfo.java
+++ b/metastore/metastore-api/src/test/java/org/apache/drill/metastore/components/tables/TestMetastoreTableInfo.java
@@ -17,12 +17,15 @@
  */
 package org.apache.drill.metastore.components.tables;
 
+import org.apache.drill.categories.MetastoreTest;
 import org.apache.drill.metastore.metadata.TableInfo;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
+@Category(MetastoreTest.class)
 public class TestMetastoreTableInfo {
 
   @Test
diff --git a/metastore/metastore-api/src/test/java/org/apache/drill/metastore/components/tables/TestTableMetadataUnitConversion.java b/metastore/metastore-api/src/test/java/org/apache/drill/metastore/components/tables/TestTableMetadataUnitConversion.java
index 359c0cb..7f49947 100644
--- a/metastore/metastore-api/src/test/java/org/apache/drill/metastore/components/tables/TestTableMetadataUnitConversion.java
+++ b/metastore/metastore-api/src/test/java/org/apache/drill/metastore/components/tables/TestTableMetadataUnitConversion.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.metastore.components.tables;
 
+import org.apache.drill.categories.MetastoreTest;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.exec.record.metadata.SchemaBuilder;
@@ -36,6 +37,7 @@ import org.apache.drill.metastore.statistics.StatisticsHolder;
 import org.apache.hadoop.fs.Path;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 
 import java.util.Arrays;
 import java.util.Collection;
@@ -50,6 +52,7 @@ import java.util.stream.Collectors;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 
+@Category(MetastoreTest.class)
 public class TestTableMetadataUnitConversion {
 
   private static Data data;
diff --git a/metastore/metastore-api/src/test/java/org/apache/drill/metastore/metadata/MetadataSerDeTest.java b/metastore/metastore-api/src/test/java/org/apache/drill/metastore/metadata/MetadataSerDeTest.java
index eb44741..6a2d36d 100644
--- a/metastore/metastore-api/src/test/java/org/apache/drill/metastore/metadata/MetadataSerDeTest.java
+++ b/metastore/metastore-api/src/test/java/org/apache/drill/metastore/metadata/MetadataSerDeTest.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.metastore.metadata;
 
+import org.apache.drill.categories.MetastoreTest;
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.metastore.statistics.BaseStatisticsKind;
 import org.apache.drill.metastore.statistics.ColumnStatistics;
@@ -24,6 +25,7 @@ import org.apache.drill.metastore.statistics.ColumnStatisticsKind;
 import org.apache.drill.metastore.statistics.StatisticsHolder;
 import org.apache.drill.metastore.statistics.TableStatisticsKind;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 
 import java.math.BigDecimal;
 import java.util.Arrays;
@@ -33,6 +35,7 @@ import java.util.Objects;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
+@Category(MetastoreTest.class)
 public class MetadataSerDeTest {
 
   @Test


[drill] 06/06: DRILL-7353: Wrong driver class is written to the java.sql.Driver

Posted by ar...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

arina pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 31a41995c3f708894cc77bad3b27ce72203c423c
Author: Anton Gozhiy <an...@gmail.com>
AuthorDate: Mon Aug 19 20:33:14 2019 +0300

    DRILL-7353: Wrong driver class is written to the java.sql.Driver
    
    closes #1845
---
 exec/jdbc-all/pom.xml                              | 16 +++++++++++++
 .../org/apache/drill/jdbc/ITTestShadedJar.java     | 28 ++++++++++++++++++----
 2 files changed, 39 insertions(+), 5 deletions(-)

diff --git a/exec/jdbc-all/pom.xml b/exec/jdbc-all/pom.xml
index 2dab193..13234fd 100644
--- a/exec/jdbc-all/pom.xml
+++ b/exec/jdbc-all/pom.xml
@@ -496,6 +496,14 @@
                <exclude>webapps/**</exclude>
              </excludes>
            </filter>
+           <!-- This file is used to automatically load given jdbc driver without calling Class.forName().
+                Excluding the Avatica service file which is conflicting with the Drill one. -->
+           <filter>
+             <artifact>org.apache.calcite.avatica:*</artifact>
+             <excludes>
+               <exclude>META-INF/services/java.sql.Driver</exclude>
+             </excludes>
+           </filter>
          </filters>
         </configuration>
       </plugin>
@@ -799,6 +807,14 @@
                       <exclude>webapps/**</exclude>
                     </excludes>
                   </filter>
+                  <!-- This file is used to automatically load given jdbc driver without calling Class.forName().
+                       Excluding the Avatica service file which is conflicting with the Drill one. -->
+                  <filter>
+                    <artifact>org.apache.calcite.avatica:*</artifact>
+                    <excludes>
+                      <exclude>META-INF/services/java.sql.Driver</exclude>
+                    </excludes>
+                  </filter>
                 </filters>
               </configuration>
             </plugin>
diff --git a/exec/jdbc-all/src/test/java/org/apache/drill/jdbc/ITTestShadedJar.java b/exec/jdbc-all/src/test/java/org/apache/drill/jdbc/ITTestShadedJar.java
index 4fed146..c343037 100644
--- a/exec/jdbc-all/src/test/java/org/apache/drill/jdbc/ITTestShadedJar.java
+++ b/exec/jdbc-all/src/test/java/org/apache/drill/jdbc/ITTestShadedJar.java
@@ -17,9 +17,16 @@
  */
 package org.apache.drill.jdbc;
 
-import static org.junit.Assert.assertEquals;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
 
+import java.io.BufferedReader;
 import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
 import java.lang.reflect.Field;
 import java.lang.reflect.Method;
 import java.net.MalformedURLException;
@@ -32,11 +39,10 @@ import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.Vector;
 import java.util.concurrent.Semaphore;
+import java.util.stream.Collectors;
 
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.rules.TestWatcher;
-import org.junit.runner.Description;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 
 public class ITTestShadedJar {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ITTestShadedJar.class);
@@ -166,6 +172,18 @@ public class ITTestShadedJar {
 
   }
 
+  @Test
+  public void serviceFileContainsCorrectDriver() throws IOException {
+    URLClassLoader loader = URLClassLoader.newInstance(new URL[]{getJdbcUrl()});
+    try (InputStream resourceStream = loader.getResourceAsStream("META-INF/services/java.sql.Driver")) {
+      assertNotNull("java.sql.Driver is not present in the jdbc jar", resourceStream);
+      try (BufferedReader reader = new BufferedReader(new InputStreamReader(resourceStream))) {
+        String driverClass = reader.lines().collect(Collectors.joining(System.lineSeparator()));
+        assertEquals("org.apache.drill.jdbc.Driver", driverClass);
+      }
+    }
+  }
+
   private static void printQuery(Connection c, String query) throws SQLException {
     final StringBuilder sb = new StringBuilder();
 


[drill] 01/06: DRILL-7156: Support empty Parquet files creation

Posted by ar...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

arina pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit ef93515f88caf232f00fd6a4b37a6751592bea8a
Author: Oleg Zinoviev <oz...@solit-clouds.ru>
AuthorDate: Sun Jun 16 21:21:46 2019 +0300

    DRILL-7156: Support empty Parquet files creation
    
    closes #1836
---
 .../exec/store/parquet/ParquetRecordWriter.java    |  95 +++++++++-------
 .../impl/writer/TestParquetWriterEmptyFiles.java   | 123 +++++++++++++++++----
 2 files changed, 159 insertions(+), 59 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
index 5a64f40..a9f7f14 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
@@ -122,6 +122,9 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
   private PrimitiveTypeName logicalTypeForDecimals;
   private boolean usePrimitiveTypesForDecimals;
 
+  /** Is used to ensure that empty Parquet file will be written if no rows were provided. */
+  private boolean empty = true;
+
   public ParquetRecordWriter(FragmentContext context, ParquetWriter writer) throws OutOfMemoryException {
     this.oContext = context.newOperatorContext(writer);
     this.codecFactory = CodecFactory.createDirectCodecFactory(writer.getFormatPlugin().getFsConf(),
@@ -205,7 +208,7 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
   public void updateSchema(VectorAccessible batch) throws IOException {
     if (this.batchSchema == null || !this.batchSchema.equals(batch.getSchema()) || containsComplexVectors(this.batchSchema)) {
       if (this.batchSchema != null) {
-        flush();
+        flush(false);
       }
       this.batchSchema = batch.getSchema();
       newSchema();
@@ -310,7 +313,7 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
     try {
       boolean newPartition = newPartition(index);
       if (newPartition) {
-        flush();
+        flush(false);
         newSchema();
       }
     } catch (Exception e) {
@@ -318,19 +321,18 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
     }
   }
 
-  private void flush() throws IOException {
+  private void flush(boolean cleanUp) throws IOException {
     try {
       if (recordCount > 0) {
-        parquetFileWriter.startBlock(recordCount);
-        consumer.flush();
-        store.flush();
-        pageStore.flushToFileWriter(parquetFileWriter);
-        recordCount = 0;
-        parquetFileWriter.endBlock();
-
-        // we are writing one single block per file
-        parquetFileWriter.end(extraMetaData);
-        parquetFileWriter = null;
+        flushParquetFileWriter();
+      } else if (cleanUp && empty && schema != null && schema.getFieldCount() > 0) {
+        // Write empty parquet if:
+        // 1) This is a cleanup - no any additional records can be written
+        // 2) No file was written until this moment
+        // 3) Schema is set
+        // 4) Schema is not empty
+        createParquetFileWriter();
+        flushParquetFileWriter();
       }
     } finally {
       store.close();
@@ -347,7 +349,7 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
       long memSize = store.getBufferedSize();
       if (memSize > blockSize) {
         logger.debug("Reached block size " + blockSize);
-        flush();
+        flush(false);
         newSchema();
         recordCountForNextMemCheck = min(max(MINIMUM_RECORD_COUNT_FOR_CHECK, recordCount / 2), MAXIMUM_RECORD_COUNT_FOR_CHECK);
       } else {
@@ -435,29 +437,10 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
 
     // we wait until there is at least one record before creating the parquet file
     if (parquetFileWriter == null) {
-      Path path = new Path(location, prefix + "_" + index + ".parquet");
-      // to ensure that our writer was the first to create output file, we create empty file first and fail if file exists
-      Path firstCreatedPath = storageStrategy.createFileAndApply(fs, path);
-
-      // since parquet reader supports partitions, it means that several output files may be created
-      // if this writer was the one to create table folder, we store only folder and delete it with its content in case of abort
-      // if table location was created before, we store only files created by this writer and delete them in case of abort
-      addCleanUpLocation(fs, firstCreatedPath);
-
-      // since ParquetFileWriter will overwrite empty output file (append is not supported)
-      // we need to re-apply file permission
-      if (useSingleFSBlock) {
-        // Passing blockSize creates files with this blockSize instead of filesystem default blockSize.
-        // Currently, this is supported only by filesystems included in
-        // BLOCK_FS_SCHEMES (ParquetFileWriter.java in parquet-mr), which includes HDFS.
-        // For other filesystems, it uses default blockSize configured for the file system.
-        parquetFileWriter = new ParquetFileWriter(conf, schema, path, ParquetFileWriter.Mode.OVERWRITE, blockSize, 0);
-      } else {
-        parquetFileWriter = new ParquetFileWriter(conf, schema, path, ParquetFileWriter.Mode.OVERWRITE);
-      }
-      storageStrategy.applyToFile(fs, path);
-      parquetFileWriter.start();
+      createParquetFileWriter();
     }
+
+    empty = false;
     recordCount++;
     checkBlockSizeReached();
   }
@@ -486,11 +469,49 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
 
   @Override
   public void cleanup() throws IOException {
-    flush();
+    flush(true);
 
     codecFactory.release();
   }
 
+  private void createParquetFileWriter() throws IOException {
+    Path path = new Path(location, prefix + "_" + index + ".parquet");
+    // to ensure that our writer was the first to create output file, we create empty file first and fail if file exists
+    Path firstCreatedPath = storageStrategy.createFileAndApply(fs, path);
+
+    // since parquet reader supports partitions, it means that several output files may be created
+    // if this writer was the one to create table folder, we store only folder and delete it with its content in case of abort
+    // if table location was created before, we store only files created by this writer and delete them in case of abort
+    addCleanUpLocation(fs, firstCreatedPath);
+
+    // since ParquetFileWriter will overwrite empty output file (append is not supported)
+    // we need to re-apply file permission
+    if (useSingleFSBlock) {
+      // Passing blockSize creates files with this blockSize instead of filesystem default blockSize.
+      // Currently, this is supported only by filesystems included in
+      // BLOCK_FS_SCHEMES (ParquetFileWriter.java in parquet-mr), which includes HDFS.
+      // For other filesystems, it uses default blockSize configured for the file system.
+      parquetFileWriter = new ParquetFileWriter(conf, schema, path, ParquetFileWriter.Mode.OVERWRITE, blockSize, 0);
+    } else {
+      parquetFileWriter = new ParquetFileWriter(conf, schema, path, ParquetFileWriter.Mode.OVERWRITE);
+    }
+    storageStrategy.applyToFile(fs, path);
+    parquetFileWriter.start();
+  }
+
+  private void flushParquetFileWriter() throws IOException {
+    parquetFileWriter.startBlock(recordCount);
+    consumer.flush();
+    store.flush();
+    pageStore.flushToFileWriter(parquetFileWriter);
+    recordCount = 0;
+    parquetFileWriter.endBlock();
+
+    // we are writing one single block per file
+    parquetFileWriter.end(extraMetaData);
+    parquetFileWriter = null;
+  }
+
   /**
    * Adds passed location to the list of locations to be cleaned up in case of abort.
    * Add locations if:
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriterEmptyFiles.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriterEmptyFiles.java
index bc72234..d2ae653 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriterEmptyFiles.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriterEmptyFiles.java
@@ -18,16 +18,24 @@
 package org.apache.drill.exec.physical.impl.writer;
 
 import org.apache.commons.io.FileUtils;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.BatchSchemaBuilder;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
 import org.apache.drill.test.BaseTestQuery;
 import org.apache.drill.categories.ParquetTest;
 import org.apache.drill.categories.UnlikelyTest;
 import org.apache.drill.exec.ExecConstants;
-import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
 import java.io.File;
+import java.nio.file.Paths;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 @Category({ParquetTest.class, UnlikelyTest.class})
 public class TestParquetWriterEmptyFiles extends BaseTestQuery {
@@ -35,42 +43,110 @@ public class TestParquetWriterEmptyFiles extends BaseTestQuery {
   @BeforeClass
   public static void initFs() throws Exception {
     updateTestCluster(3, null);
+    dirTestWatcher.copyResourceToRoot(Paths.get("schemachange"));
+    dirTestWatcher.copyResourceToRoot(Paths.get("parquet", "empty"));
+    dirTestWatcher.copyResourceToRoot(Paths.get("parquet", "alltypes_required.parquet"));
   }
 
-  @Test // see DRILL-2408
+  @Test
   public void testWriteEmptyFile() throws Exception {
     final String outputFileName = "testparquetwriteremptyfiles_testwriteemptyfile";
     final File outputFile = FileUtils.getFile(dirTestWatcher.getDfsTestTmpDir(), outputFileName);
 
     test("CREATE TABLE dfs.tmp.%s AS SELECT * FROM cp.`employee.json` WHERE 1=0", outputFileName);
-    Assert.assertFalse(outputFile.exists());
+    assertTrue(outputFile.exists());
   }
 
   @Test
-  public void testMultipleWriters() throws Exception {
-    final String outputFile = "testparquetwriteremptyfiles_testmultiplewriters";
+  public void testWriteEmptyFileWithSchema() throws Exception {
+    final String outputFileName = "testparquetwriteremptyfiles_testwriteemptyfilewithschema";
 
-    runSQL("alter session set `planner.slice_target` = 1");
+    test("CREATE TABLE dfs.tmp.%s AS select * from dfs.`parquet/alltypes_required.parquet` where `col_int` = 0", outputFileName);
 
-    try {
-      final String query = "SELECT position_id FROM cp.`employee.json` WHERE position_id IN (15, 16) GROUP BY position_id";
+    // Only the last scan scheme is written
+    SchemaBuilder schemaBuilder = new SchemaBuilder()
+      .add("col_int", TypeProtos.MinorType.INT)
+      .add("col_chr", TypeProtos.MinorType.VARCHAR)
+      .add("col_vrchr", TypeProtos.MinorType.VARCHAR)
+      .add("col_dt", TypeProtos.MinorType.DATE)
+      .add("col_tim", TypeProtos.MinorType.TIME)
+      .add("col_tmstmp", TypeProtos.MinorType.TIMESTAMP)
+      .add("col_flt", TypeProtos.MinorType.FLOAT4)
+      .add("col_intrvl_yr", TypeProtos.MinorType.INTERVAL)
+      .add("col_intrvl_day", TypeProtos.MinorType.INTERVAL)
+      .add("col_bln", TypeProtos.MinorType.BIT);
+    BatchSchema expectedSchema = new BatchSchemaBuilder()
+      .withSchemaBuilder(schemaBuilder)
+      .build();
 
-      test("CREATE TABLE dfs.tmp.%s AS %s", outputFile, query);
+    testBuilder()
+      .unOrdered()
+      .sqlQuery("select * from dfs.tmp.%s", outputFileName)
+      .schemaBaseLine(expectedSchema)
+      .go();
+  }
 
-      // this query will fail if an "empty" file was created
-      testBuilder()
-        .unOrdered()
-        .sqlQuery("SELECT * FROM dfs.tmp.%s", outputFile)
-        .sqlBaselineQuery(query)
-        .go();
-    } finally {
-      runSQL("alter session set `planner.slice_target` = " + ExecConstants.SLICE_TARGET_DEFAULT);
-    }
+  @Test
+  public void testWriteEmptyFileWithEmptySchema() throws Exception {
+    final String outputFileName = "testparquetwriteremptyfiles_testwriteemptyfileemptyschema";
+    final File outputFile = FileUtils.getFile(dirTestWatcher.getDfsTestTmpDir(), outputFileName);
+
+    test("CREATE TABLE dfs.tmp.%s AS SELECT * FROM cp.`empty.json`", outputFileName);
+    assertFalse(outputFile.exists());
   }
 
-  @Test // see DRILL-2408
+  @Test
+  public void testWriteEmptySchemaChange() throws Exception {
+    final String outputFileName = "testparquetwriteremptyfiles_testwriteemptyschemachange";
+    final File outputFile = FileUtils.getFile(dirTestWatcher.getDfsTestTmpDir(), outputFileName);
+
+    test("CREATE TABLE dfs.tmp.%s AS select id, a, b from dfs.`schemachange/multi/*.json` WHERE id = 0", outputFileName);
+
+    // Only the last scan scheme is written
+    SchemaBuilder schemaBuilder = new SchemaBuilder()
+      .addNullable("id", TypeProtos.MinorType.BIGINT)
+      .addNullable("a", TypeProtos.MinorType.BIGINT)
+      .addNullable("b", TypeProtos.MinorType.BIT);
+    BatchSchema expectedSchema = new BatchSchemaBuilder()
+      .withSchemaBuilder(schemaBuilder)
+      .build();
+
+    testBuilder()
+      .unOrdered()
+      .sqlQuery("select * from dfs.tmp.%s", outputFileName)
+      .schemaBaseLine(expectedSchema)
+      .go();
+
+    // Make sure that only 1 parquet file was created
+    assertEquals(1, outputFile.list((dir, name) -> name.endsWith("parquet")).length);
+  }
+
+  @Test
+  public void testComplexEmptyFileSchema() throws Exception {
+    final String outputFileName = "testparquetwriteremptyfiles_testcomplexemptyfileschema";
+
+    test("create table dfs.tmp.%s as select * from dfs.`parquet/empty/complex/empty_complex.parquet`", outputFileName);
+
+    // end_date column is null, so it missing in result schema.
+    SchemaBuilder schemaBuilder = new SchemaBuilder()
+      .addNullable("id", TypeProtos.MinorType.BIGINT)
+      .addNullable("name", TypeProtos.MinorType.VARCHAR)
+      .addArray("orders", TypeProtos.MinorType.BIGINT);
+    BatchSchema expectedSchema = new BatchSchemaBuilder()
+      .withSchemaBuilder(schemaBuilder)
+      .build();
+
+    testBuilder()
+      .unOrdered()
+      .sqlQuery("select * from dfs.tmp.%s", outputFileName)
+      .schemaBaseLine(expectedSchema)
+      .go();
+  }
+
+  @Test
   public void testWriteEmptyFileAfterFlush() throws Exception {
-    final String outputFile = "testparquetwriteremptyfiles_test_write_empty_file_after_flush";
+    final String outputFileName = "testparquetwriteremptyfiles_test_write_empty_file_after_flush";
+    final File outputFile = FileUtils.getFile(dirTestWatcher.getDfsTestTmpDir(), outputFileName);
 
     try {
       // this specific value will force a flush just after the final row is written
@@ -78,12 +154,15 @@ public class TestParquetWriterEmptyFiles extends BaseTestQuery {
       test("ALTER SESSION SET `store.parquet.block-size` = 19926");
 
       final String query = "SELECT * FROM cp.`employee.json` LIMIT 100";
-      test("CREATE TABLE dfs.tmp.%s AS %s", outputFile, query);
+      test("CREATE TABLE dfs.tmp.%s AS %s", outputFileName, query);
+
+      // Make sure that only 1 parquet file was created
+      assertEquals(1, outputFile.list((dir, name) -> name.endsWith("parquet")).length);
 
       // this query will fail if an "empty" file was created
       testBuilder()
         .unOrdered()
-        .sqlQuery("SELECT * FROM dfs.tmp.%s", outputFile)
+        .sqlQuery("SELECT * FROM dfs.tmp.%s", outputFileName)
         .sqlBaselineQuery(query)
         .go();
     } finally {


[drill] 05/06: DRILL-7222: Visualize estimated and actual row counts for a query

Posted by ar...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

arina pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit b9a61b04b622ff4be0fa87091d3c879148d9b1ed
Author: Kunal Khatua <kk...@maprtech.com>
AuthorDate: Thu Aug 22 10:00:58 2019 -0700

    DRILL-7222: Visualize estimated and actual row counts for a query
    
    With statistics in place, it is useful to have the estimated rowcount along side the actual rowcount query profile's operator overview. A toggle button allows this with the estimated rows hidden by default
    
    We can extract this from the Physical Plan section of the profile.
    Added a toggle-ready table-column header
    
    closes #1779
---
 .../java/org/apache/drill/exec/ExecConstants.java  |  2 +
 .../exec/server/rest/profile/HtmlAttribute.java    |  2 +
 .../exec/server/rest/profile/OperatorWrapper.java  |  8 ++-
 .../exec/server/rest/profile/ProfileWrapper.java   |  6 ++
 .../java-exec/src/main/resources/drill-module.conf |  1 +
 .../src/main/resources/rest/profile/profile.ftl    | 64 +++++++++++++++++++---
 6 files changed, 74 insertions(+), 9 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 549d374..463b0a0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -875,6 +875,8 @@ public final class ExecConstants {
   public static final BooleanValidator DYNAMIC_UDF_SUPPORT_ENABLED_VALIDATOR = new BooleanValidator(DYNAMIC_UDF_SUPPORT_ENABLED,
       new OptionDescription("Enables users to dynamically upload UDFs. Users must upload their UDF (source and binary) JAR files to a staging directory in the distributed file system before issuing the CREATE FUNCTION USING JAR command to register a UDF. Default is true. (Drill 1.9+)"));
 
+  //Display estimated rows in operator overview by default
+  public static final String PROFILE_STATISTICS_ESTIMATED_ROWS_SHOW = "drill.exec.http.profile.statistics.estimated_rows.show";
   //Trigger warning in UX if fragments appear to be doing no work (units are in seconds).
   public static final String PROFILE_WARNING_PROGRESS_THRESHOLD = "drill.exec.http.profile.warning.progress.threshold";
   //Trigger warning in UX if slowest fragment operator crosses min threshold and exceeds ratio with average (units are in seconds).
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/HtmlAttribute.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/HtmlAttribute.java
index 75db298..95c0507 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/HtmlAttribute.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/HtmlAttribute.java
@@ -23,6 +23,7 @@ package org.apache.drill.exec.server.rest.profile;
 public class HtmlAttribute {
   //Attributes
   public static final String CLASS = "class";
+  public static final String KEY = "key";
   public static final String DATA_ORDER = "data-order";
   public static final String TITLE = "title";
   public static final String SPILLS = "spills";
@@ -33,5 +34,6 @@ public class HtmlAttribute {
   public static final String CLASS_VALUE_NO_PROGRESS_TAG = "no-progress-tag";
   public static final String CLASS_VALUE_TIME_SKEW_TAG = "time-skew-tag";
   public static final String CLASS_VALUE_SCAN_WAIT_TAG = "scan-wait-tag";
+  public static final String CLASS_VALUE_EST_ROWS_ANCHOR = "estRowsAnchor";
   public static final String STYLE_VALUE_CURSOR_HELP = "cursor:help;";
 }
\ No newline at end of file
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/OperatorWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/OperatorWrapper.java
index 0f61170..2e593b6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/OperatorWrapper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/OperatorWrapper.java
@@ -143,7 +143,7 @@ public class OperatorWrapper {
       OverviewTblTxt.AVG_SETUP_TIME, OverviewTblTxt.MAX_SETUP_TIME,
       OverviewTblTxt.AVG_PROCESS_TIME, OverviewTblTxt.MAX_PROCESS_TIME,
       OverviewTblTxt.MIN_WAIT_TIME, OverviewTblTxt.AVG_WAIT_TIME, OverviewTblTxt.MAX_WAIT_TIME,
-      OverviewTblTxt.PERCENT_FRAGMENT_TIME, OverviewTblTxt.PERCENT_QUERY_TIME, OverviewTblTxt.ROWS,
+      OverviewTblTxt.PERCENT_FRAGMENT_TIME, OverviewTblTxt.PERCENT_QUERY_TIME, OverviewTblTxt.ROWS.concat(OverviewTblTxt.ESTIMATED_ROWS),
       OverviewTblTxt.AVG_PEAK_MEMORY, OverviewTblTxt.MAX_PEAK_MEMORY
   };
 
@@ -269,7 +269,10 @@ public class OperatorWrapper {
     tb.appendPercent(processSum / majorBusyNanos);
     tb.appendPercent(processSum / majorFragmentBusyTallyTotal);
 
-    tb.appendFormattedInteger(recordSum);
+    Map<String, String> estRowcountMap = new HashMap<>();
+    estRowcountMap.put(HtmlAttribute.CLASS, HtmlAttribute.CLASS_VALUE_EST_ROWS_ANCHOR);
+    estRowcountMap.put(HtmlAttribute.KEY, path.replaceAll("-xx-", "-"));
+    tb.appendFormattedInteger(recordSum, estRowcountMap);
 
     final ImmutablePair<OperatorProfile, Integer> peakMem = Collections.max(opList, Comparators.operatorPeakMemory);
 
@@ -419,6 +422,7 @@ public class OperatorWrapper {
     static final String PERCENT_FRAGMENT_TIME = "% Fragment Time";
     static final String PERCENT_QUERY_TIME = "% Query Time";
     static final String ROWS = "Rows";
+    static final String ESTIMATED_ROWS = "<div class='estRows' title='Estimated'>(Estimated)</div>";
     static final String AVG_PEAK_MEMORY = "Avg Peak Memory";
     static final String MAX_PEAK_MEMORY = "Max Peak Memory";
   }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileWrapper.java
index e6e6318..5c144eb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileWrapper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileWrapper.java
@@ -65,6 +65,7 @@ public class ProfileWrapper {
   private Map<String, String> physicalOperatorMap;
   private final String noProgressWarningThreshold;
   private final int defaultAutoLimit;
+  private boolean showEstimatedRows;
 
   public ProfileWrapper(final QueryProfile profile, DrillConfig drillConfig) {
     this.profile = profile;
@@ -134,6 +135,7 @@ public class ProfileWrapper {
 
     this.onlyImpersonationEnabled = WebServer.isOnlyImpersonationEnabled(drillConfig);
     this.noProgressWarningThreshold = String.valueOf(drillConfig.getInt(ExecConstants.PROFILE_WARNING_PROGRESS_THRESHOLD));
+    this.showEstimatedRows = drillConfig.getBoolean(ExecConstants.PROFILE_STATISTICS_ESTIMATED_ROWS_SHOW);
   }
 
   private long tallyMajorFragmentCost(List<MajorFragmentProfile> majorFragments) {
@@ -390,4 +392,8 @@ public class ProfileWrapper {
       physicalOperatorMap.put(operatorPath, extractedOperatorName);
     }
   }
+
+  public boolean showEstimatedRows() {
+    return showEstimatedRows;
+  }
 }
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index 422d850..dd062ea 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -143,6 +143,7 @@ drill.exec: {
     }
     max_profiles: 100,
     profiles_per_page: [10, 25, 50, 100],
+    profile.statistics.estimated_rows.show : false,
     profile.warning: {
       progress.threshold: 300,
       time.skew: {
diff --git a/exec/java-exec/src/main/resources/rest/profile/profile.ftl b/exec/java-exec/src/main/resources/rest/profile/profile.ftl
index 97f03e9..303d83d 100644
--- a/exec/java-exec/src/main/resources/rest/profile/profile.ftl
+++ b/exec/java-exec/src/main/resources/rest/profile/profile.ftl
@@ -42,7 +42,7 @@
     };
 
     $(document).ready(function() {
-      $(".sortable").DataTable( {
+      $(".sortable").DataTable({
         "searching": false,
         "lengthChange": false,
         "paging": false,
@@ -84,6 +84,29 @@
         document.getElementById(warningElemId).style.display="none";
     }
 
+    //Injects Estimated Rows
+    function injectEstimatedRows() {
+      Object.keys(opRowCountMap).forEach(key => {
+        var tgtElem = $("td.estRowsAnchor[key='" + key + "']"); 
+        var status = tgtElem.append("<div class='estRows' title='Estimated'>(" + opRowCountMap[key] + ")</div>");
+      });
+    }
+
+    //Toggle Estimates' visibility
+    function toggleEstimates(tgtColumn) {
+      var colClass = '.est' + tgtColumn;
+      var estimates = $(colClass);
+      if (estimates.filter(":visible").length > 0) {
+        $(colClass).each(function () {
+          $(this).attr("style", "display:none");
+        });
+      } else {
+        $(colClass).each(function () {
+          $(this).attr("style", "display:block");
+        });
+      }
+    }
+
     //Close the cancellation status popup
     function refreshStatus() {
       //Close PopUp Modal
@@ -94,7 +117,7 @@
     //Cancel query & show cancellation status
     function cancelQuery() {
       document.getElementById("cancelTitle").innerHTML = "Drillbit on " + location.hostname + " says";
-      $.get("/profiles/cancel/"+globalconfig.queryid, function(data, status){/*Not Tracking Response*/});
+      $.get("/profiles/cancel/" + globalconfig.queryid, function(data, status){/*Not Tracking Response*/});
       //Show PopUp Modal
       $("#queryCancelModal").modal("show");
     };
@@ -389,8 +412,17 @@
   </div>
 
   <div class="page-header"></div>
-  <h3>Operator Profiles</h3>
-
+  <h3>Operator Profiles
+ <button onclick="toggleEstimates('Rows')" class="btn" style="font-size:60%; float:right">Show/Hide Estimated Rows</button></h3>
+
+ <style>
+  .estRows {
+    color:navy;
+    font-style:italic;
+    font-size: 80%;
+    display:<#if model.showEstimatedRows()>block<#else>none</#if>;
+  }
+</style>
   <div class="panel-group" id="operator-accordion">
     <div class="panel panel-default">
       <div class="panel-heading">
@@ -476,6 +508,9 @@
       injectIconByClass("spill-tag","glyphicon-download-alt");
       injectIconByClass("time-skew-tag","glyphicon-time");
       injectSlowScanIcon();
+      //Building RowCount
+      buildRowCountMap();
+      injectEstimatedRows();
     });
 
     //Inject Glyphicon by Class tag
@@ -485,7 +520,7 @@
         var i;
         for (i = 0; i < tagElemList.length; i++) {
             var content = tagElemList[i].innerHTML;
-            tagElemList[i].innerHTML = "<span class=\"glyphicon "+tagIcon+"\">&nbsp;</span>"+content;
+            tagElemList[i].innerHTML = "<span class=\"glyphicon " + tagIcon + "\">&nbsp;</span>" + content;
         }
     }
 
@@ -496,7 +531,7 @@
         var i;
         for (i = 0; i < tagElemList.length; i++) {
             var content = tagElemList[i].innerHTML;
-            tagElemList[i].innerHTML = "<img src='/static/img/turtle.png' alt='slow'> "+content;
+            tagElemList[i].innerHTML = "<img src='/static/img/turtle.png' alt='slow'> " + content;
         }
     }
 
@@ -567,7 +602,7 @@
     var popUpAndPrintPlan = function() {
       var srcSvg = $('#query-visual-canvas');
       var screenRatio=0.9;
-      let printWindow = window.open('', 'PlanPrint', 'width=' + (screenRatio*screen.width) + ',height=' + (screenRatio*screen.height) );
+      let printWindow = window.open('', 'PlanPrint', 'width=' + (screenRatio*screen.width) + ',height=' + (screenRatio*screen.height));
       printWindow.document.writeln($(srcSvg).parent().html());
       printWindow.print();
     };
@@ -587,6 +622,21 @@
       if (e.target.form) 
         <#if model.isOnlyImpersonationEnabled()>doSubmitQueryWithUserName()<#else>doSubmitQueryWithAutoLimit()</#if>;
     });
+
+    // Extract estimated rowcount map
+    var opRowCountMap = {};
+    // Get OpId-Rowocunt Map
+    function buildRowCountMap() {
+      var phyText = $('#query-physical').find('pre').text();
+      var opLines = phyText.split("\n");
+      opLines.forEach(line => {
+        if (line.trim().length > 0) {
+          var opId = line.match(/\d+-\d+/g)[0];
+          var opRowCount = line.match(/rowcount = ([^,]+)/)[1];
+          opRowCountMap[opId] = Number(opRowCount).toLocaleString('en');
+        }
+      });
+    }
     </script>
 
 </#macro>