You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ar...@apache.org on 2018/07/04 14:20:00 UTC

[drill] branch master updated (cacca92 -> 96fa539)

This is an automated email from the ASF dual-hosted git repository.

arina pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git.


    from cacca92  DRILL-6546: Allow unnest function with nested columns and complex expressions
     new e047a2b  DRILL-6557: Use size in bytes during Hive statistics calculation if present
     new 5c9303d  DRILL-6553: Fix TopN for unnest operator
     new 132a05e  DRILL-6577: Change Hash-Join fallback default to false
     new 96fa539  DRILL-6534: Upgrade ZooKeeper patch version to 3.4.12 and add Apache Curator to dependencyManagement

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../exec/store/hive/HiveMetadataProvider.java      | 232 +++++++++++----------
 contrib/storage-kafka/pom.xml                      |   8 -
 drill-yarn/pom.xml                                 |  30 ++-
 exec/java-exec/pom.xml                             |  39 ++--
 .../planner/common/DrillLateralJoinRelBase.java    |  15 +-
 .../java-exec/src/main/resources/drill-module.conf |   2 +-
 .../impl/lateraljoin/TestE2EUnnestAndLateral.java  |   9 +-
 .../impl/lateraljoin/TestLateralPlans.java         |  33 +++
 exec/jdbc-all/pom.xml                              |   1 +
 pom.xml                                            |  71 ++++++-
 10 files changed, 267 insertions(+), 173 deletions(-)


[drill] 01/04: DRILL-6557: Use size in bytes during Hive statistics calculation if present

Posted by ar...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

arina pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit e047a2bf79df5b2dc4d3abc00c79c6a53903522f
Author: Arina Ielchiieva <ar...@gmail.com>
AuthorDate: Fri Jun 29 18:25:59 2018 +0300

    DRILL-6557: Use size in bytes during Hive statistics calculation if present
    
    1. Check size in bytes presence in stats before fetching input splits and use it if present.
    2. Add log trace suggesting to use ANALYZE command before running queries if statistics is unavailable and Drill had to fetch all input splits.
    3. Minor refactoring /  cleanup in HiveMetadataProvider class.
    
    closes #1357
---
 .../exec/store/hive/HiveMetadataProvider.java      | 232 +++++++++++----------
 1 file changed, 123 insertions(+), 109 deletions(-)

diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveMetadataProvider.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveMetadataProvider.java
index 6da6c40..de45dc6 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveMetadataProvider.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveMetadataProvider.java
@@ -19,8 +19,6 @@ package org.apache.drill.exec.store.hive;
 
 import com.google.common.base.Preconditions;
 import com.google.common.base.Stopwatch;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Ordering;
 import com.google.common.collect.TreeMultimap;
@@ -50,12 +48,14 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 /**
  * Class which provides methods to get metadata of given Hive table selection. It tries to use the stats stored in
@@ -79,50 +79,52 @@ public class HiveMetadataProvider {
   public HiveMetadataProvider(final String userName, final HiveReadEntry hiveReadEntry, final HiveConf hiveConf) {
     this.hiveReadEntry = hiveReadEntry;
     this.ugi = ImpersonationUtil.createProxyUgi(userName);
-    isPartitionedTable = hiveReadEntry.getTable().getPartitionKeysSize() > 0;
-    partitionInputSplitMap = Maps.newHashMap();
+    this.isPartitionedTable = hiveReadEntry.getTable().getPartitionKeysSize() > 0;
+    this.partitionInputSplitMap = new HashMap<>();
     this.hiveConf = hiveConf;
   }
 
   /**
-   * Return stats for table/partitions in given {@link HiveReadEntry}. If valid stats are available in MetaStore,
-   * return it. Otherwise estimate using the size of the input data.
+   * Return stats for table/partitions in given {@link HiveReadEntry}.
+   * If valid stats are available in MetaStore, return it.
+   * Otherwise estimate using the size of the input data.
    *
    * @param hiveReadEntry Subset of the {@link HiveReadEntry} used when creating this cache object.
-   * @return
-   * @throws IOException
+   * @return hive statistics holder
+   * @throws IOException if was unable to retrieve table statistics
    */
   public HiveStats getStats(final HiveReadEntry hiveReadEntry) throws IOException {
-    final Stopwatch timeGetStats = Stopwatch.createStarted();
+    Stopwatch timeGetStats = Stopwatch.createStarted();
 
-    final HiveTableWithColumnCache table = hiveReadEntry.getTable();
+    HiveTableWithColumnCache table = hiveReadEntry.getTable();
     try {
       if (!isPartitionedTable) {
-        final Properties properties = MetaStoreUtils.getTableMetadata(table);
-        final HiveStats stats = getStatsFromProps(properties);
+        Properties properties = MetaStoreUtils.getTableMetadata(table);
+        HiveStats stats = HiveStats.getStatsFromProps(properties);
         if (stats.valid()) {
           return stats;
         }
 
-        // estimate the stats from the InputSplits.
-        return getStatsEstimateFromInputSplits(getTableInputSplits());
+        return stats.getSizeInBytes() > 0 ? estimateStatsFromBytes(stats.getSizeInBytes()) :
+            estimateStatsFromInputSplits(getTableInputSplits());
+
       } else {
-        final HiveStats aggStats = new HiveStats(0, 0);
-        for(HivePartition partition : hiveReadEntry.getPartitions()) {
-          final Properties properties = HiveUtilities.getPartitionMetadata(partition, table);
-          HiveStats stats = getStatsFromProps(properties);
+        HiveStats aggStats = new HiveStats(0, 0);
+        for (HivePartition partition : hiveReadEntry.getPartitions()) {
+          Properties properties = HiveUtilities.getPartitionMetadata(partition, table);
+          HiveStats stats = HiveStats.getStatsFromProps(properties);
 
           if (!stats.valid()) {
-            // estimate the stats from InputSplits
-            stats = getStatsEstimateFromInputSplits(getPartitionInputSplits(partition));
+            stats = stats.getSizeInBytes() > 0 ? estimateStatsFromBytes(stats.getSizeInBytes()) :
+                estimateStatsFromInputSplits(getPartitionInputSplits(partition));
           }
           aggStats.add(stats);
         }
 
         return aggStats;
       }
-    } catch (final Exception e) {
-      throw new IOException("Failed to get numRows from HiveTable", e);
+    } catch (Exception e) {
+      throw new IOException("Failed to get number of rows and total size from HiveTable", e);
     } finally {
       logger.debug("Took {} µs to get stats from {}.{}", timeGetStats.elapsed(TimeUnit.NANOSECONDS) / 1000,
           table.getDbName(), table.getTableName());
@@ -130,7 +132,7 @@ public class HiveMetadataProvider {
   }
 
   /** Helper method which return InputSplits for non-partitioned table */
-  private List<LogicalInputSplit> getTableInputSplits() throws Exception {
+  private List<LogicalInputSplit> getTableInputSplits() {
     Preconditions.checkState(!isPartitionedTable, "Works only for non-partitioned tables");
     if (tableInputSplits != null) {
       return tableInputSplits;
@@ -145,7 +147,7 @@ public class HiveMetadataProvider {
   /** Helper method which returns the InputSplits for given partition. InputSplits are cached to speed up subsequent
    * metadata cache requests for the same partition(s).
    */
-  private List<LogicalInputSplit> getPartitionInputSplits(final HivePartition partition) throws Exception {
+  private List<LogicalInputSplit> getPartitionInputSplits(final HivePartition partition) {
     if (partitionInputSplitMap.containsKey(partition)) {
       return partitionInputSplitMap.get(partition);
     }
@@ -164,18 +166,17 @@ public class HiveMetadataProvider {
    * @param hiveReadEntry Subset of the {@link HiveReadEntry} used when creating this object.
    * @return list of logically grouped input splits
    */
-  public List<LogicalInputSplit> getInputSplits(final HiveReadEntry hiveReadEntry) {
-    final Stopwatch timeGetSplits = Stopwatch.createStarted();
+  public List<LogicalInputSplit> getInputSplits(HiveReadEntry hiveReadEntry) {
+    Stopwatch timeGetSplits = Stopwatch.createStarted();
     try {
       if (!isPartitionedTable) {
         return getTableInputSplits();
       }
 
-      final List<LogicalInputSplit> splits = Lists.newArrayList();
-      for (HivePartition p : hiveReadEntry.getPartitions()) {
-        splits.addAll(getPartitionInputSplits(p));
-      }
-      return splits;
+      return hiveReadEntry.getPartitions().stream()
+          .flatMap(p -> getPartitionInputSplits(p).stream())
+          .collect(Collectors.toList());
+
     } catch (final Exception e) {
       logger.error("Failed to get InputSplits", e);
       throw new DrillRuntimeException("Failed to get InputSplits", e);
@@ -190,63 +191,44 @@ public class HiveMetadataProvider {
    *
    * @param hiveReadEntry {@link HiveReadEntry} containing the input table and/or partitions.
    */
-  protected List<String> getInputDirectories(final HiveReadEntry hiveReadEntry) {
+  protected List<String> getInputDirectories(HiveReadEntry hiveReadEntry) {
     if (isPartitionedTable) {
-      final List<String> inputs = Lists.newArrayList();
-      for(Partition p : hiveReadEntry.getPartitions()) {
-        inputs.add(p.getSd().getLocation());
-      }
-      return inputs;
+      return hiveReadEntry.getPartitions().stream()
+          .map(p -> p.getSd().getLocation())
+          .collect(Collectors.toList());
     }
 
     return Collections.singletonList(hiveReadEntry.getTable().getSd().getLocation());
   }
 
   /**
-   * Get the stats from table properties. If not found -1 is returned for each stats field.
-   * CAUTION: stats may not be up-to-date with the underlying data. It is always good to run the ANALYZE command on
-   * Hive table to have up-to-date stats.
+   * Estimate the stats from the given list of logically grouped input splits.
    *
-   * @param properties the source of table stats
-   * @return {@link HiveStats} instance with rows number and size in bytes from specified properties
+   * @param inputSplits list of logically grouped input splits
+   * @return hive stats with numRows and totalSizeInBytes
    */
-  private HiveStats getStatsFromProps(final Properties properties) {
-    long numRows = -1;
-    long sizeInBytes = -1;
-    try {
-      final String numRowsProp = properties.getProperty(StatsSetupConst.ROW_COUNT);
-      if (numRowsProp != null) {
-          numRows = Long.valueOf(numRowsProp);
-      }
-
-      final String sizeInBytesProp = properties.getProperty(StatsSetupConst.TOTAL_SIZE);
-      if (sizeInBytesProp != null) {
-        sizeInBytes = Long.valueOf(sizeInBytesProp);
-      }
-    } catch (final NumberFormatException e) {
-      logger.error("Failed to parse Hive stats in metastore.", e);
-      // continue with the defaults.
+  private HiveStats estimateStatsFromInputSplits(List<LogicalInputSplit> inputSplits) throws IOException {
+    logger.trace("Collecting stats based on input splits size. " +
+        "It means that we might have fetched all input splits before applying any possible optimizations (ex: partition pruning). " +
+        "Consider using ANALYZE command on Hive table to collect statistics before running queries.");
+    long sizeInBytes = 0;
+    for (LogicalInputSplit split : inputSplits) {
+      sizeInBytes += split.getLength();
     }
-
-    return new HiveStats(numRows, sizeInBytes);
+    return estimateStatsFromBytes(sizeInBytes);
   }
 
   /**
-   * Estimate the stats from the given list of logically grouped input splits.
+   * Estimates Hive stats based on give size in bytes.
    *
-   * @param inputSplits list of logically grouped input splits
-   * @return hive stats usually numRows and totalSizeInBytes which used
+   * @param sizeInBytes size in bytes
+   * @return hive stats with numRows and totalSizeInBytes
    */
-  private HiveStats getStatsEstimateFromInputSplits(final List<LogicalInputSplit> inputSplits) throws IOException {
-    long data = 0;
-    for (final LogicalInputSplit split : inputSplits) {
-      data += split.getLength();
-    }
-
-    long numRows = data / RECORD_SIZE;
+  private HiveStats estimateStatsFromBytes(long sizeInBytes) {
+    long numRows = sizeInBytes / RECORD_SIZE;
     // if the result of division is zero and data size > 0, estimate to one row
-    numRows = numRows == 0 && data > 0 ? 1 : numRows;
-    return new HiveStats(numRows, data);
+    numRows = numRows == 0 && sizeInBytes > 0 ? 1 : numRows;
+    return new HiveStats(numRows, sizeInBytes);
   }
 
   /**
@@ -262,36 +244,34 @@ public class HiveMetadataProvider {
   private List<LogicalInputSplit> splitInputWithUGI(final Properties properties, final StorageDescriptor sd, final Partition partition) {
     watch.start();
     try {
-      return ugi.doAs(new PrivilegedExceptionAction<List<LogicalInputSplit>>() {
-        public List<LogicalInputSplit> run() throws Exception {
-          final List<LogicalInputSplit> splits = Lists.newArrayList();
-          final JobConf job = new JobConf(hiveConf);
-          HiveUtilities.addConfToJob(job, properties);
-          HiveUtilities.verifyAndAddTransactionalProperties(job, sd);
-          job.setInputFormat(HiveUtilities.getInputFormatClass(job, sd, hiveReadEntry.getTable()));
-          final Path path = new Path(sd.getLocation());
-          final FileSystem fs = path.getFileSystem(job);
-          if (fs.exists(path)) {
-            FileInputFormat.addInputPath(job, path);
-            final InputFormat<?, ?> format = job.getInputFormat();
-            InputSplit[] inputSplits = format.getSplits(job, 1);
-
-            // if current table with text input format and has header / footer,
-            // we need to make sure that splits of the same file are grouped together
-            if (TextInputFormat.class.getCanonicalName().equals(sd.getInputFormat()) &&
-                HiveUtilities.hasHeaderOrFooter(hiveReadEntry.getTable())) {
-              Multimap<Path, FileSplit> inputSplitMultimap = transformFileSplits(inputSplits);
-              for (Collection<FileSplit> logicalInputSplit : inputSplitMultimap.asMap().values()) {
-                splits.add(new LogicalInputSplit(logicalInputSplit, partition));
-              }
-            } else {
-              for (final InputSplit split : inputSplits) {
-                splits.add(new LogicalInputSplit(split, partition));
-              }
+      return ugi.doAs((PrivilegedExceptionAction<List<LogicalInputSplit>>) () -> {
+        final List<LogicalInputSplit> splits = new ArrayList<>();
+        final JobConf job = new JobConf(hiveConf);
+        HiveUtilities.addConfToJob(job, properties);
+        HiveUtilities.verifyAndAddTransactionalProperties(job, sd);
+        job.setInputFormat(HiveUtilities.getInputFormatClass(job, sd, hiveReadEntry.getTable()));
+        final Path path = new Path(sd.getLocation());
+        final FileSystem fs = path.getFileSystem(job);
+        if (fs.exists(path)) {
+          FileInputFormat.addInputPath(job, path);
+          final InputFormat<?, ?> format = job.getInputFormat();
+          InputSplit[] inputSplits = format.getSplits(job, 1);
+
+          // if current table with text input format and has header / footer,
+          // we need to make sure that splits of the same file are grouped together
+          if (TextInputFormat.class.getCanonicalName().equals(sd.getInputFormat()) &&
+              HiveUtilities.hasHeaderOrFooter(hiveReadEntry.getTable())) {
+            Multimap<Path, FileSplit> inputSplitMultimap = transformFileSplits(inputSplits);
+            for (Collection<FileSplit> logicalInputSplit : inputSplitMultimap.asMap().values()) {
+              splits.add(new LogicalInputSplit(logicalInputSplit, partition));
+            }
+          } else {
+            for (final InputSplit split : inputSplits) {
+              splits.add(new LogicalInputSplit(split, partition));
             }
           }
-          return splits;
         }
+        return splits;
       });
     } catch (final InterruptedException | IOException e) {
       final String errMsg = String.format("Failed to create input splits: %s", e.getMessage());
@@ -320,13 +300,8 @@ public class HiveMetadataProvider {
    * @return multimap where key is file path and value is group of ordered file splits
    */
   private Multimap<Path, FileSplit> transformFileSplits(InputSplit[] inputSplits) {
-    Multimap<Path, FileSplit> inputSplitGroups = TreeMultimap.create(Ordering.<Path>natural(),
-        new Comparator<FileSplit>() {
-      @Override
-      public int compare(FileSplit f1, FileSplit f2) {
-        return Long.compare(f1.getStart(), f2.getStart());
-      }
-    });
+    Multimap<Path, FileSplit> inputSplitGroups = TreeMultimap.create(
+        Ordering.natural(), Comparator.comparingLong(FileSplit::getStart));
 
     for (InputSplit inputSplit : inputSplits) {
       FileSplit fileSplit = (FileSplit) inputSplit;
@@ -413,16 +388,53 @@ public class HiveMetadataProvider {
     }
   }
 
-  /** Contains stats. Currently only numRows and totalSizeInBytes are used. */
+  /**
+   * Contains stats. Currently only numRows and totalSizeInBytes are used.
+   */
   public static class HiveStats {
+
+    private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HiveStats.class);
+
     private long numRows;
     private long sizeInBytes;
 
-    public HiveStats(final long numRows, final long sizeInBytes) {
+    public HiveStats(long numRows, long sizeInBytes) {
       this.numRows = numRows;
       this.sizeInBytes = sizeInBytes;
     }
 
+    /**
+     * Get the stats from table properties. If not found -1 is returned for each stats field.
+     * CAUTION: stats may not be up-to-date with the underlying data. It is always good to run the ANALYZE command on
+     * Hive table to have up-to-date stats.
+     *
+     * @param properties the source of table stats
+     * @return {@link HiveStats} instance with rows number and size in bytes from specified properties
+     */
+    public static HiveStats getStatsFromProps(Properties properties) {
+      long numRows = -1;
+      long sizeInBytes = -1;
+      try {
+        String sizeInBytesProp = properties.getProperty(StatsSetupConst.TOTAL_SIZE);
+        if (sizeInBytesProp != null) {
+          sizeInBytes = Long.valueOf(sizeInBytesProp);
+        }
+
+        String numRowsProp = properties.getProperty(StatsSetupConst.ROW_COUNT);
+        if (numRowsProp != null) {
+          numRows = Long.valueOf(numRowsProp);
+        }
+      } catch (NumberFormatException e) {
+        logger.error("Failed to parse Hive stats from metastore.", e);
+        // continue with the defaults.
+      }
+
+      HiveStats hiveStats = new HiveStats(numRows, sizeInBytes);
+      logger.trace("Obtained Hive stats from properties: {}.", hiveStats);
+      return hiveStats;
+    }
+
+
     public long getNumRows() {
       return numRows;
     }
@@ -431,7 +443,9 @@ public class HiveMetadataProvider {
       return sizeInBytes;
     }
 
-    /** Both numRows and sizeInBytes are expected to be greater than 0 for stats to be valid */
+    /**
+     * Both numRows and sizeInBytes are expected to be greater than 0 for stats to be valid
+     */
     public boolean valid() {
       return numRows > 0 && sizeInBytes > 0;
     }


[drill] 03/04: DRILL-6577: Change Hash-Join fallback default to false

Posted by ar...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

arina pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 132a05ecec9d65c00bcce20678f29228abaea418
Author: Ben-Zvi <bb...@mapr.com>
AuthorDate: Mon Jul 2 18:30:47 2018 -0700

    DRILL-6577: Change Hash-Join fallback default to false
    
    closes #1359
---
 exec/java-exec/src/main/resources/drill-module.conf | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index 6889a2f..4b1e9dd 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -427,7 +427,7 @@ drill.exec.options: {
     # Setting to control if HashJoin should fallback to older behavior of consuming
     # unbounded memory. By default it's set to false such that the
     # query will fail if there is not enough memory
-    drill.exec.hashjoin.fallback.enabled: true, # should soon be changed to false !!
+    drill.exec.hashjoin.fallback.enabled: false
     # Setting to control if HashAgg should fallback to older behavior of consuming
     # unbounded memory. In case of 2 phase Agg when available memory is not enough
     # to start at least 2 partitions then HashAgg fallbacks to this case. It can be


[drill] 04/04: DRILL-6534: Upgrade ZooKeeper patch version to 3.4.12 and add Apache Curator to dependencyManagement

Posted by ar...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

arina pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 96fa53900ab470b89cf7d5b812d43f61db6eb056
Author: Bohdan Kazydub <bo...@gmail.com>
AuthorDate: Tue Jun 26 12:30:53 2018 +0300

    DRILL-6534: Upgrade ZooKeeper patch version to 3.4.12 and add Apache Curator to dependencyManagement
    
    closes #1337
---
 contrib/storage-kafka/pom.xml |  8 -----
 drill-yarn/pom.xml            | 30 ++++++++----------
 exec/java-exec/pom.xml        | 39 ++++++++----------------
 exec/jdbc-all/pom.xml         |  1 +
 pom.xml                       | 71 +++++++++++++++++++++++++++++++++++++++++--
 5 files changed, 94 insertions(+), 55 deletions(-)

diff --git a/contrib/storage-kafka/pom.xml b/contrib/storage-kafka/pom.xml
index 0adcfb9..5e2809f 100644
--- a/contrib/storage-kafka/pom.xml
+++ b/contrib/storage-kafka/pom.xml
@@ -88,14 +88,6 @@
     <dependency>
       <groupId>org.apache.curator</groupId>
       <artifactId>curator-test</artifactId>
-      <version>${curator.test.version}</version>
-      <scope>test</scope>
-      <exclusions>
-        <exclusion>
-          <groupId>log4j</groupId>
-          <artifactId>log4j</artifactId>
-        </exclusion>
-      </exclusions>
     </dependency>
     <dependency>
       <groupId>org.apache.drill</groupId>
diff --git a/drill-yarn/pom.xml b/drill-yarn/pom.xml
index b6ada46..a2238c6 100644
--- a/drill-yarn/pom.xml
+++ b/drill-yarn/pom.xml
@@ -98,8 +98,19 @@
     </dependency>
     <dependency>
       <groupId>org.apache.curator</groupId>
+      <artifactId>curator-client</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.curator</groupId>
+      <artifactId>curator-framework</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.curator</groupId>
+      <artifactId>curator-x-discovery</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.curator</groupId>
       <artifactId>curator-test</artifactId>
-      <scope>test</scope>
     </dependency>
 
     <!-- For configuration -->
@@ -125,21 +136,4 @@
       <scope>test</scope>
     </dependency>
   </dependencies>
-
-  <dependencyManagement>
-   <dependencies>
-	   <dependency>
-	      <groupId>org.apache.curator</groupId>
-	      <artifactId>curator-test</artifactId>
-	      <version>${curator.test.version}</version>
-	      <scope>test</scope>
-	      <exclusions>
-          <exclusion>
-            <groupId>log4j</groupId>
-            <artifactId>log4j</artifactId>
-          </exclusion>
-	      </exclusions>
-	    </dependency>
-    </dependencies>
-  </dependencyManagement>
 </project>
diff --git a/exec/java-exec/pom.xml b/exec/java-exec/pom.xml
index 94c3459..6d57ba3 100644
--- a/exec/java-exec/pom.xml
+++ b/exec/java-exec/pom.xml
@@ -312,38 +312,23 @@
     </dependency>
     <dependency>
       <groupId>org.apache.curator</groupId>
+      <artifactId>curator-client</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.curator</groupId>
+      <artifactId>curator-framework</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.curator</groupId>
+      <artifactId>curator-recipes</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.curator</groupId>
       <artifactId>curator-x-discovery</artifactId>
-      <version>2.7.1</version>
-      <exclusions>
-        <exclusion>
-          <artifactId>slf4j-log4j12</artifactId>
-          <groupId>org.slf4j</groupId>
-        </exclusion>
-        <exclusion>
-          <artifactId>log4j</artifactId>
-          <groupId>log4j</groupId>
-        </exclusion>
-        <exclusion>
-            <groupId>io.netty</groupId>
-            <artifactId>netty</artifactId>
-        </exclusion>
-        <exclusion>
-            <groupId>io.netty</groupId>
-            <artifactId>netty-all</artifactId>
-        </exclusion>
-      </exclusions>
     </dependency>
     <dependency>
       <groupId>org.apache.curator</groupId>
       <artifactId>curator-test</artifactId>
-      <version>${curator.test.version}</version>
-      <scope>test</scope>
-      <exclusions>
-        <exclusion>
-            <groupId>io.netty</groupId>
-            <artifactId>netty</artifactId>
-        </exclusion>
-      </exclusions>
     </dependency>
     <dependency>
       <groupId>com.carrotsearch</groupId>
diff --git a/exec/jdbc-all/pom.xml b/exec/jdbc-all/pom.xml
index d8423de..f3595be 100644
--- a/exec/jdbc-all/pom.xml
+++ b/exec/jdbc-all/pom.xml
@@ -636,6 +636,7 @@
                     <exclude>org.objenesis:objenesis</exclude>
                     <exclude>org.eclipse.jetty:*</exclude>
                     <exclude>org.apache.hadoop:*</exclude>
+                    <exclude>org.jboss.netty:netty</exclude>
                     <exclude>javax.xml.bind:jaxb-api</exclude>
                     <exclude>javax.xml.stream:stax-api</exclude>
                     <exclude>javax.activation:activation</exclude>
diff --git a/pom.xml b/pom.xml
index e3391b5..56582c3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -51,6 +51,7 @@
     <sqlline.version>1.1.9-drill-r7</sqlline.version>
     <jackson.version>2.9.5</jackson.version>
     <jackson.databind.version>2.9.5</jackson.databind.version>
+    <zookeeper.version>3.4.12</zookeeper.version>
     <mapr.release.version>5.2.1-mapr</mapr.release.version>
     <ojai.version>1.1</ojai.version>
     <kerby.version>1.0.0-RC2</kerby.version>
@@ -60,7 +61,7 @@
     <junit.version>4.11</junit.version>
     <hamcrest.core.version>1.3</hamcrest.core.version>
     <maven.embedder.version>3.5.3</maven.embedder.version>
-    <curator.test.version>2.7.1</curator.test.version>
+    <curator.version>2.7.1</curator.version>
     <parquet.hadoop.version>1.8.1</parquet.hadoop.version>
     <wiremock.standalone.version>2.5.1</wiremock.standalone.version>
     <jmockit.version>1.39</jmockit.version>
@@ -244,7 +245,7 @@
             <dependency>
               <groupId>org.apache.curator</groupId>
               <artifactId>curator-test</artifactId>
-              <version>${curator.test.version}</version>
+              <version>${curator.version}</version>
             </dependency>
             <dependency>
               <groupId>org.apache.hadoop</groupId>
@@ -1555,6 +1556,71 @@
         <artifactId>config</artifactId>
         <version>1.0.0</version>
       </dependency>
+      <dependency>
+        <groupId>org.apache.zookeeper</groupId>
+        <artifactId>zookeeper</artifactId>
+        <version>${zookeeper.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.curator</groupId>
+        <artifactId>curator-client</artifactId>
+        <version>${curator.version}</version>
+        <exclusions>
+          <exclusion>
+            <groupId>log4j</groupId>
+            <artifactId>log4j</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>io.netty</groupId>
+            <artifactId>netty</artifactId>
+          </exclusion>
+        </exclusions>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.curator</groupId>
+        <artifactId>curator-framework</artifactId>
+        <version>${curator.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.curator</groupId>
+        <artifactId>curator-recipes</artifactId>
+        <version>${curator.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.curator</groupId>
+        <artifactId>curator-x-discovery</artifactId>
+        <version>${curator.version}</version>
+        <exclusions>
+          <exclusion>
+            <artifactId>slf4j-log4j12</artifactId>
+            <groupId>org.slf4j</groupId>
+          </exclusion>
+          <exclusion>
+            <artifactId>log4j</artifactId>
+            <groupId>log4j</groupId>
+          </exclusion>
+          <exclusion>
+            <groupId>io.netty</groupId>
+            <artifactId>netty</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-all</artifactId>
+          </exclusion>
+        </exclusions>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.curator</groupId>
+        <artifactId>curator-test</artifactId>
+        <version>${curator.version}</version>
+        <scope>test</scope>
+        <exclusions>
+          <exclusion>
+            <groupId>log4j</groupId>
+            <artifactId>log4j</artifactId>
+          </exclusion>
+        </exclusions>
+      </dependency>
     </dependencies>
   </dependencyManagement>
 
@@ -2359,6 +2425,7 @@
         <hive.version>2.1.1-mapr-1710</hive.version>
         <hbase.version>1.1.1-mapr-1602-m7-5.2.0</hbase.version>
         <hadoop.version>2.7.0-mapr-1707</hadoop.version>
+        <zookeeper.version>3.4.5-mapr-1710</zookeeper.version>
       </properties>
       <dependencyManagement>
         <dependencies>


[drill] 02/04: DRILL-6553: Fix TopN for unnest operator

Posted by ar...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

arina pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 5c9303d7e3797377191ff9a57103ef033e023db1
Author: Volodymyr Vysotskyi <vv...@gmail.com>
AuthorDate: Thu Jun 28 20:20:38 2018 +0300

    DRILL-6553: Fix TopN for unnest operator
    
    closes #1353
---
 .../planner/common/DrillLateralJoinRelBase.java    | 15 ++++++----
 .../impl/lateraljoin/TestE2EUnnestAndLateral.java  |  9 ++++--
 .../impl/lateraljoin/TestLateralPlans.java         | 33 ++++++++++++++++++++++
 3 files changed, 49 insertions(+), 8 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillLateralJoinRelBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillLateralJoinRelBase.java
index 2f895e2..5a2b40e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillLateralJoinRelBase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillLateralJoinRelBase.java
@@ -50,15 +50,15 @@ public abstract class DrillLateralJoinRelBase extends Correlate implements Drill
     this.excludeCorrelateColumn = excludeCorrelateCol;
   }
 
-  @Override public RelOptCost computeSelfCost(RelOptPlanner planner,
-                                              RelMetadataQuery mq) {
+  @Override
+  public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
     DrillCostBase.DrillCostFactory costFactory = (DrillCostBase.DrillCostFactory) planner.getCostFactory();
 
-    double rowCount = mq.getRowCount(this.getLeft());
+    double rowCount = estimateRowCount(mq);
     long fieldWidth = PrelUtil.getPlannerSettings(planner).getOptions()
-        .getOption(ExecConstants.AVERAGE_FIELD_WIDTH_KEY).num_val;
+        .getLong(ExecConstants.AVERAGE_FIELD_WIDTH_KEY);
 
-    double rowSize = (this.getLeft().getRowType().getFieldList().size()) * fieldWidth;
+    double rowSize = left.getRowType().getFieldList().size() * fieldWidth;
 
     double cpuCost = rowCount * rowSize * DrillCostBase.BASE_CPU_COST;
     double memCost = !excludeCorrelateColumn ? CORRELATE_MEM_COPY_COST : 0.0;
@@ -117,4 +117,9 @@ public abstract class DrillLateralJoinRelBase extends Correlate implements Drill
     }
     return inputRowType;
   }
+
+  @Override
+  public double estimateRowCount(RelMetadataQuery mq) {
+    return mq.getRowCount(left);
+  }
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestE2EUnnestAndLateral.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestE2EUnnestAndLateral.java
index 98f051e..c57093c 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestE2EUnnestAndLateral.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestE2EUnnestAndLateral.java
@@ -18,8 +18,10 @@
 package org.apache.drill.exec.physical.impl.lateraljoin;
 
 import org.apache.drill.categories.OperatorTest;
+import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterFixtureBuilder;
 import org.apache.drill.test.ClusterTest;
 import org.apache.drill.test.TestBuilder;
 import org.junit.BeforeClass;
@@ -43,8 +45,10 @@ public class TestE2EUnnestAndLateral extends ClusterTest {
   public static void setupTestFiles() throws Exception {
     dirTestWatcher.copyResourceToRoot(Paths.get("lateraljoin", "multipleFiles", regularTestFile_1));
     dirTestWatcher.copyResourceToRoot(Paths.get("lateraljoin", "multipleFiles", regularTestFile_2));
-    startCluster(ClusterFixture.builder(dirTestWatcher).maxParallelization(1));
-    test("alter session set `planner.enable_unnest_lateral`=%s", true);
+    ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher)
+        .sessionOption(ExecConstants.ENABLE_UNNEST_LATERAL_KEY, true)
+        .maxParallelization(1);
+    startCluster(builder);
   }
 
   /***********************************************************************************************
@@ -97,7 +101,6 @@ public class TestE2EUnnestAndLateral extends ClusterTest {
   /**
    * Test which disables the TopN operator from planner settings before running query using SORT and LIMIT in
    * subquery. The same query as in above test is executed and same result is expected.
-   * @throws Exception
    */
   @Test
   public void testLateral_WithSortAndLimitInSubQuery() throws Exception {
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestLateralPlans.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestLateralPlans.java
index 8ff164f..77d245f 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestLateralPlans.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestLateralPlans.java
@@ -17,8 +17,11 @@
  */
 package org.apache.drill.exec.physical.impl.lateraljoin;
 
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.core.IsNot.not;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
 import org.apache.drill.PlanTestBase;
@@ -499,4 +502,34 @@ public class TestLateralPlans extends BaseTestQuery {
       assertTrue(matcher.find());
     }
   }
+
+  @Test
+  public void testUnnestTopN() throws Exception {
+    String query =
+        "select customer.c_custkey," +
+                "customer.c_name," +
+                "t.o.o_orderkey," +
+                "t.o.o_totalprice\n" +
+        "from dfs.`lateraljoin/multipleFiles` customer," +
+              "unnest(customer.c_orders) t(o)\n" +
+        "order by customer.c_custkey," +
+                  "t.o.o_orderkey," +
+                  "t.o.o_totalprice\n" +
+        "limit 50";
+
+    ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher)
+        .setOptionDefault(ExecConstants.ENABLE_UNNEST_LATERAL_KEY, true);
+
+    try (ClusterFixture cluster = builder.build();
+         ClientFixture client = cluster.clientFixture()) {
+      String plan = client.queryBuilder()
+          .sql(query)
+          .explainText();
+
+      assertThat("Query plan doesn't contain TopN operator",
+          plan, containsString("TopN(limit=[50])"));
+      assertThat("Query plan shouldn't contain Sort operator",
+          plan, not(containsString("Sort")));
+    }
+  }
 }