You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by sa...@apache.org on 2016/12/05 22:24:59 UTC

[1/7] incubator-impala git commit: IMPALA-4172/IMPALA-3653: Improvements to block metadata loading

Repository: incubator-impala
Updated Branches:
  refs/heads/master b9034ea0d -> b8b64e110


IMPALA-4172/IMPALA-3653: Improvements to block metadata loading

This patch improves the block metadata loading (locations and disk
storage IDs) for partitioned and un-partitioned tables in the Catalog
server.

Without this patch:
------------------
We loop through each and every file in the table/partition directories
and call getFileBlockLocations() on it to obtain the block metadata.
This results in large number of RPC calls to the Namenode, especially
for tables with large no. of files/partitions.

With this patch:
---------------
We move the block metadata querying to use listStatus() call which
accepts a directory as input and fetches the 'BlockLocation' objects
for every file recursively in that directory. This improves the
metadata loading in the following ways.

- For non-partitioned tables, we query all the BlockLocations in a
single RPC call in the base table directory and load the corresponding
disk IDs.

- For partitioned tables, we query the BlockLocations for all the
partitions residing under the base table directories in a single RPC
and then load every partition with non-default partition directory
separately.

- REFRESH on a table reloads the block metadata from scratch for
every data file every time. So it can be used as a replacement for
invalidate in situations like HDFS block rebalancing which needs
block metadata update.

Also, this patch does away with VolumeIds returned by the HDFS NN
and uses the new StorageIDs returned by the BlockLocation class.
These StorageIDs are UUID strings and hence are mapped to a
per-node 0-based index as expected by the backend. In the upcoming
versions of Hadoop APIs, getFileBlockStorageLocations() is deprecated
and instead the listStatus() returns BlockLocations with storage IDs
embedded. This patch makes use of this improvement to reduce an
additional RPC to the NN to fetch the storage locations.

Change-Id: Ie127658172e6e70dae441374530674a4ac9d5d26
Reviewed-on: http://gerrit.cloudera.org:8080/5148
Reviewed-by: Bharath Vissapragada <bh...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/6662c553
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/6662c553
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/6662c553

Branch: refs/heads/master
Commit: 6662c55364b1c429340edc1ffd14323167f7b561
Parents: b9034ea
Author: Bharath Vissapragada <bh...@cloudera.com>
Authored: Sun Nov 13 22:15:41 2016 -0800
Committer: Internal Jenkins <cl...@gerrit.cloudera.org>
Committed: Sat Dec 3 21:17:46 2016 +0000

----------------------------------------------------------------------
 .../impala/catalog/CatalogServiceCatalog.java   |   2 +-
 .../org/apache/impala/catalog/DiskIdMapper.java |  88 +++
 .../apache/impala/catalog/HdfsPartition.java    |   7 +-
 .../org/apache/impala/catalog/HdfsTable.java    | 576 ++++++++-----------
 .../apache/impala/common/FileSystemUtil.java    |  30 +-
 .../impala/service/CatalogOpExecutor.java       |   3 +-
 6 files changed, 366 insertions(+), 340 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6662c553/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
index 6fbcccc..85d92cb 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
@@ -928,7 +928,7 @@ public class CatalogServiceCatalog extends Catalog {
           throw new TableLoadingException("Error loading metadata for table: " +
               db.getName() + "." + tblName.getTable_name(), e);
         }
-        tbl.load(true, msClient.getHiveClient(), msTbl);
+        tbl.load(false, msClient.getHiveClient(), msTbl);
       }
       tbl.setCatalogVersion(newCatalogVersion);
       return tbl;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6662c553/fe/src/main/java/org/apache/impala/catalog/DiskIdMapper.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/DiskIdMapper.java b/fe/src/main/java/org/apache/impala/catalog/DiskIdMapper.java
new file mode 100644
index 0000000..6fdcef0
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/catalog/DiskIdMapper.java
@@ -0,0 +1,88 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.catalog;
+
+import com.google.common.collect.Maps;
+import com.google.common.base.Strings;
+import com.google.common.base.Preconditions;
+
+import java.util.HashMap;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * A singleton class that maps HDFS storage-UUIDs to per-host 0-based, sequential disk
+ * ids. This mapping is internally implemented as a global static object shared
+ * across all the table instances. The rationale behind this implementation is
+ * - To maintain a consistent mapping across all the table instances so that the
+ *   assignment of scan ranges to I/O threads is balanced and consistent for all scans
+ *   on the same host.
+ * - To Reduce memory usage in the Catalog since UUIDs can potentially consume a lot of
+ *   memory when maintained per table instance.
+ */
+public class DiskIdMapper {
+
+    public static DiskIdMapper INSTANCE = new DiskIdMapper();
+
+    private DiskIdMapper() {}
+
+    // Maps each storage ID UUID string returned by the BlockLocation API, to a per-node
+    // sequential 0-based integer disk id used by the BE scanners. This assumes that
+    // the storage ID of a particular disk is unique across all the nodes in the cluster.
+    private ConcurrentHashMap<String, Integer> storageUuidToDiskId =
+        new ConcurrentHashMap<String, Integer>();
+
+    // Per-host ID generator for storage UUID to integer ID mapping. This maps each host
+    // to the corresponding latest 0-based integer ID.
+    private HashMap<String, Integer> storageIdGenerator = Maps.newHashMap();
+
+    /**
+     * Returns a disk id (0-based) index for storageUuid on host 'host'. Generates a
+     * new disk ID for storageUuid if one doesn't already exist. We cache the mappings
+     * already generated for faster lookups.
+     *
+     * TODO: It is quite possible that there will be lock contention in this method during
+     * the initial metadata load. Figure out ways to fix it using finer locking scheme.
+     */
+    public int getDiskId(String host, String storageUuid) {
+      Preconditions.checkState(!Strings.isNullOrEmpty(host));
+      // Initialize the diskId as -1 to indicate it is unknown
+      int diskId = -1;
+      // Check if an existing mapping is already present. This is intentionally kept
+      // out of the synchronized block to avoid contention for lookups. Once a reasonable
+      // amount of data loading is done and storageIdtoInt is populated with storage IDs
+      // across the cluster, we expect to have a good hit rate.
+      Integer intId = storageUuidToDiskId.get(storageUuid);
+      if (intId != null) return intId;
+      synchronized (storageIdGenerator) {
+        // Mapping might have been added by another thread that entered the synchronized
+        // block first.
+        intId = storageUuidToDiskId.get(storageUuid);
+        if (intId != null) return intId;
+        // No mapping exists, create a new disk ID for 'storageUuid'
+        if (storageIdGenerator.containsKey(host)) {
+          diskId = storageIdGenerator.get(host) + 1;
+        } else {
+          // First diskId of this host.
+          diskId = 0;
+        }
+        storageIdGenerator.put(host, new Integer(diskId));
+        storageUuidToDiskId.put(storageUuid, new Integer(diskId));
+      }
+      return diskId;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6662c553/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
index 8718419..c240613 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
@@ -107,7 +107,11 @@ public class HdfsPartition implements Comparable<HdfsPartition> {
     }
 
     public void addFileBlock(FileBlock blockMd) {
-      fileDescriptor_.addToFile_blocks(blockMd.toThrift());
+      addThriftFileBlock(blockMd.toThrift());
+    }
+
+    public void addThriftFileBlock(THdfsFileBlock block) {
+      fileDescriptor_.addToFile_blocks(block);
     }
 
     public static FileDescriptor fromThrift(THdfsFileDesc desc) {
@@ -381,6 +385,7 @@ public class HdfsPartition implements Comparable<HdfsPartition> {
   public String getLocation() {
     return (location_ != null) ? location_.toString() : null;
   }
+  public Path getLocationPath() { return new Path(getLocation()); }
   public long getId() { return id_; }
   public HdfsTable getTable() { return table_; }
   public void setNumRows(long numRows) { numRows_ = numRows; }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6662c553/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
index 2a30cec..386ef79 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
@@ -36,8 +36,9 @@ import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.BlockStorageLocation;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.VolumeId;
+import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
@@ -90,6 +91,7 @@ import org.apache.impala.util.MetaStoreUtil;
 import org.apache.impala.util.TAccessLevelUtil;
 import org.apache.impala.util.TResultRowBuilder;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
@@ -177,8 +179,6 @@ public class HdfsTable extends Table {
   // data is cached or that all/any partitions are cached.
   private boolean isMarkedCached_ = false;
 
-  private static boolean hasLoggedDiskIdFormatWarning_ = false;
-
   // Array of sorted maps storing the association between partition values and
   // partition ids. There is one sorted map per partition key.
   // TODO: We should not populate this for HdfsTable objects stored in the catalog
@@ -213,10 +213,6 @@ public class HdfsTable extends Table {
 
   private HdfsPartitionLocationCompressor partitionLocationCompressor_;
 
-  // Map of file names to file descriptors for each partition location (directory).
-  private Map<String, Map<String, FileDescriptor>>
-      perPartitionFileDescMap_ = Maps.newHashMap();
-
   // Total number of Hdfs files in this table. Set in load().
   private long numHdfsFiles_;
 
@@ -249,48 +245,6 @@ public class HdfsTable extends Table {
   // and its usage in getFileSystem suggests it should be.
   private static final Configuration CONF = new Configuration();
 
-  private static final boolean SUPPORTS_VOLUME_ID;
-
-  // Wrapper around a FileSystem object to hash based on the underlying FileSystem's
-  // scheme and authority.
-  private static class FsKey {
-    FileSystem filesystem;
-
-    public FsKey(FileSystem fs) { filesystem = fs; }
-
-    @Override
-    public int hashCode() { return filesystem.getUri().hashCode(); }
-
-    @Override
-    public boolean equals(Object o) {
-      if (o == this) return true;
-      if (o != null && o instanceof FsKey) {
-        URI uri = filesystem.getUri();
-        URI otherUri = ((FsKey)o).filesystem.getUri();
-        return uri.equals(otherUri);
-      }
-      return false;
-    }
-
-    @Override
-    public String toString() { return filesystem.getUri().toString(); }
-  }
-
-  // Keeps track of newly added THdfsFileBlock metadata and its corresponding
-  // BlockLocation.  For each i, blocks.get(i) corresponds to locations.get(i).  Once
-  // all the new file blocks are collected, the disk volume IDs are retrieved in one
-  // batched DFS call.
-  private static class FileBlocksInfo {
-    final List<THdfsFileBlock> blocks = Lists.newArrayList();
-    final List<BlockLocation> locations = Lists.newArrayList();
-
-    public void addBlocks(List<THdfsFileBlock> b, List<BlockLocation> l) {
-      Preconditions.checkState(b.size() == l.size());
-      blocks.addAll(b);
-      locations.addAll(l);
-    }
-  }
-
   public HdfsTable(org.apache.hadoop.hive.metastore.api.Table msTbl,
       Db db, String name, String owner) {
     super(msTbl, db, name, owner);
@@ -298,39 +252,6 @@ public class HdfsTable extends Table {
         new HdfsPartitionLocationCompressor(numClusteringCols_);
   }
 
-  static {
-    SUPPORTS_VOLUME_ID =
-        CONF.getBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED,
-                        DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT);
-  }
-
-  /**
-   * Returns a disk id (0-based) index from the Hdfs VolumeId object.
-   * There is currently no public API to get at the volume id. We'll have to get it
-   * by accessing the internals.
-   */
-  private static int getDiskId(VolumeId hdfsVolumeId) {
-    // Initialize the diskId as -1 to indicate it is unknown
-    int diskId = -1;
-
-    if (hdfsVolumeId != null) {
-      // TODO: this is a hack and we'll have to address this by getting the
-      // public API. Also, we need to be very mindful of this when we change
-      // the version of HDFS.
-      String volumeIdString = hdfsVolumeId.toString();
-      // This is the hacky part. The toString is currently the underlying id
-      // encoded as hex.
-      byte[] volumeIdBytes = StringUtils.hexStringToByte(volumeIdString);
-      if (volumeIdBytes != null && volumeIdBytes.length == 4) {
-        diskId = Bytes.toInt(volumeIdBytes);
-      } else if (!hasLoggedDiskIdFormatWarning_) {
-        LOG.warn("wrong disk id format: " + volumeIdString);
-        hasLoggedDiskIdFormatWarning_ = true;
-      }
-    }
-    return diskId;
-  }
-
   public boolean spansMultipleFileSystems() { return multipleFileSystems_; }
 
   /**
@@ -360,74 +281,181 @@ public class HdfsTable extends Table {
   }
 
   /**
-   * Queries the filesystem to load the file block metadata (e.g. DFS blocks) for the
-   * given file.  Adds the newly created block metadata and block location to the
-   * perFsFileBlocks, so that the disk IDs for each block can be retrieved with one
-   * call to DFS.
+   * Drops and re-loads the block metadata for all partitions in 'partsByPath' whose
+   * location is under the given 'dirPath'. It involves the following steps:
+   * - Clear the current block metadata of the partitions.
+   * - Call FileSystem.listStatus() on 'dirPath' to fetch the BlockLocations for each
+   *   file under it recursively.
+   * - For every valid data file, map it to a partition from 'partsByPath' (if one exists)
+   *   and enumerate all its blocks and their corresponding hosts and disk IDs.
+   * TODO: Split this method into more logical methods for cleaner code.
    */
-  private void loadBlockMetadata(FileSystem fs, FileStatus file, FileDescriptor fd,
-      HdfsFileFormat fileFormat, Map<FsKey, FileBlocksInfo> perFsFileBlocks) {
-    Preconditions.checkNotNull(fd);
-    Preconditions.checkNotNull(perFsFileBlocks);
-    Preconditions.checkArgument(!file.isDirectory());
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("load block md for " + name_ + " file " + fd.getFileName());
-    }
-
-    if (!FileSystemUtil.hasGetFileBlockLocations(fs)) {
-      synthesizeBlockMetadata(fs, fd, fileFormat);
-      return;
-    }
+  private void loadBlockMetadata(Path dirPath,
+      HashMap<Path, List<HdfsPartition>> partsByPath) {
     try {
-      BlockLocation[] locations = fs.getFileBlockLocations(file, 0, file.getLen());
-      Preconditions.checkNotNull(locations);
-
-      // Loop over all blocks in the file.
-      for (BlockLocation loc: locations) {
-        Preconditions.checkNotNull(loc);
-        // Get the location of all block replicas in ip:port format.
-        String[] blockHostPorts = loc.getNames();
-        // Get the hostnames for all block replicas. Used to resolve which hosts
-        // contain cached data. The results are returned in the same order as
-        // block.getNames() so it allows us to match a host specified as ip:port to
-        // corresponding hostname using the same array index.
-        String[] blockHostNames = loc.getHosts();
-        Preconditions.checkState(blockHostNames.length == blockHostPorts.length);
-        // Get the hostnames that contain cached replicas of this block.
-        Set<String> cachedHosts =
-            Sets.newHashSet(Arrays.asList(loc.getCachedHosts()));
-        Preconditions.checkState(cachedHosts.size() <= blockHostNames.length);
-
-        // Now enumerate all replicas of the block, adding any unknown hosts
-        // to hostMap_/hostList_. The host ID (index in to the hostList_) for each
-        // replica is stored in replicaHostIdxs.
-        List<BlockReplica> replicas = Lists.newArrayListWithExpectedSize(
-            blockHostPorts.length);
-        for (int i = 0; i < blockHostPorts.length; ++i) {
-          TNetworkAddress networkAddress = BlockReplica.parseLocation(blockHostPorts[i]);
-          Preconditions.checkState(networkAddress != null);
-          replicas.add(new BlockReplica(hostIndex_.getIndex(networkAddress),
-              cachedHosts.contains(blockHostNames[i])));
+      FileSystem fs = dirPath.getFileSystem(CONF);
+      // No need to load blocks for empty partitions list.
+      if (partsByPath.size() == 0 || !fs.exists(dirPath)) return;
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Loading block md for " + name_ + " directory " + dirPath.toString());
+      }
+      // Clear the state of partitions under dirPath since they are now updated based
+      // on the current snapshot of files in the directory.
+      for (Map.Entry<Path, List<HdfsPartition>> entry: partsByPath.entrySet()) {
+        Path partDir = entry.getKey();
+        if (!FileSystemUtil.isDescendantPath(partDir, dirPath)) continue;
+        for (HdfsPartition partition: entry.getValue()) {
+          partition.setFileDescriptors(new ArrayList<FileDescriptor>());
         }
-        fd.addFileBlock(new FileBlock(loc.getOffset(), loc.getLength(), replicas));
       }
-      // Remember the THdfsFileBlocks and corresponding BlockLocations.  Once all the
-      // blocks are collected, the disk IDs will be queried in one batch per filesystem.
-      addPerFsFileBlocks(perFsFileBlocks, fs, fd.getFileBlocks(),
-          Arrays.asList(locations));
+      // For file systems that do not support BlockLocation API, we manually synthesize
+      // block location metadata based on file formats.
+      if (!FileSystemUtil.supportsStorageIds(fs)) {
+        synthesizeBlockMetadata(fs, dirPath, partsByPath);
+        return;
+      }
+      int unknownDiskIdCount = 0;
+      RemoteIterator<LocatedFileStatus> fileStatusIter = fs.listFiles(dirPath, true);
+      while (fileStatusIter.hasNext()) {
+        LocatedFileStatus fileStatus = fileStatusIter.next();
+        if (!FileSystemUtil.isValidDataFile(fileStatus)) continue;
+        // Find the partition that this file belongs (if any).
+        Path partPathDir = fileStatus.getPath().getParent();
+        Preconditions.checkNotNull(partPathDir);
+
+        List<HdfsPartition> partitions = partsByPath.get(partPathDir);
+        // Skip if this file does not belong to any known partition.
+        if (partitions == null) {
+          if (LOG.isTraceEnabled()) {
+            LOG.trace("File " + fileStatus.getPath().toString() + " doesn't correspond " +
+                " to a known partition. Skipping metadata load for this file.");
+          }
+          continue;
+        }
+        String fileName = fileStatus.getPath().getName();
+        FileDescriptor fd = new FileDescriptor(fileName, fileStatus.getLen(),
+            fileStatus.getModificationTime());
+        BlockLocation[] locations = fileStatus.getBlockLocations();
+        String partPathDirName = partPathDir.toString();
+        for (BlockLocation loc: locations) {
+          Set<String> cachedHosts = Sets.newHashSet(loc.getCachedHosts());
+          // Enumerate all replicas of the block, adding any unknown hosts
+          // to hostIndex_. We pick the network address from getNames() and
+          // map it to the corresponding hostname from getHosts().
+          List<BlockReplica> replicas = Lists.newArrayListWithExpectedSize(
+              loc.getNames().length);
+          for (int i = 0; i < loc.getNames().length; ++i) {
+            TNetworkAddress networkAddress =
+                BlockReplica.parseLocation(loc.getNames()[i]);
+            replicas.add(new BlockReplica(hostIndex_.getIndex(networkAddress),
+                cachedHosts.contains(loc.getHosts()[i])));
+          }
+          FileBlock currentBlock =
+              new FileBlock(loc.getOffset(), loc.getLength(), replicas);
+          THdfsFileBlock tHdfsFileBlock = currentBlock.toThrift();
+          fd.addThriftFileBlock(tHdfsFileBlock);
+          unknownDiskIdCount += loadDiskIds(loc, tHdfsFileBlock);
+        }
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("Adding file md dir: " + partPathDirName + " file: " + fileName);
+        }
+        // Update the partitions' metadata that this file belongs to.
+        for (HdfsPartition partition: partitions) {
+          partition.getFileDescriptors().add(fd);
+          numHdfsFiles_++;
+          totalHdfsBytes_ += fd.getFileLength();
+        }
+      }
+      if (unknownDiskIdCount > 0) {
+        if (LOG.isWarnEnabled()) {
+          LOG.warn("Unknown disk id count for filesystem " + fs + ":" +
+              unknownDiskIdCount);
+        }
+      }
+    } catch (IOException e) {
+      throw new RuntimeException("Error loading block metadata for directory "
+          + dirPath.toString() + ": " + e.getMessage(), e);
+    }
+  }
+
+  /**
+   * Loads the disk IDs for BlockLocation 'location' and its corresponding file block.
+   * HDFS API for BlockLocation returns a storageID UUID string for each disk
+   * hosting the block, which is then mapped to a 0-based integer id called disk ID.
+   * Returns the number of unknown disk IDs encountered in this process.
+   */
+  private int loadDiskIds(BlockLocation location, THdfsFileBlock fileBlock) {
+    int unknownDiskIdCount = 0;
+    String[] storageIds = location.getStorageIds();
+    String[] hosts;
+    try {
+      hosts = location.getHosts();
     } catch (IOException e) {
-      throw new RuntimeException("couldn't determine block locations for path '" +
-          file.getPath() + "':\n" + e.getMessage(), e);
+      LOG.error("Couldn't get hosts for block: " + location.toString(), e);
+      return unknownDiskIdCount;
+    }
+    if (storageIds.length != hosts.length) {
+      LOG.error("Number of storage IDs and number of hosts for block: " + location
+          .toString() + " mismatch. Skipping disk ID loading for this block.");
+      return unknownDiskIdCount;
     }
+    int[] diskIDs = new int[storageIds.length];
+    for (int i = 0; i < storageIds.length; ++i) {
+      if (Strings.isNullOrEmpty(storageIds[i])) {
+        diskIDs[i] = -1;
+        ++unknownDiskIdCount;
+      } else {
+        diskIDs[i] = DiskIdMapper.INSTANCE.getDiskId(hosts[i], storageIds[i]);
+      }
+    }
+    FileBlock.setDiskIds(diskIDs, fileBlock);
+    return unknownDiskIdCount;
   }
 
   /**
-   * For filesystems that don't override getFileBlockLocations, synthesize file blocks
+   * For filesystems that don't support BlockLocation API, synthesize file blocks
    * by manually splitting the file range into fixed-size blocks.  That way, scan
    * ranges can be derived from file blocks as usual.  All synthesized blocks are given
    * an invalid network address so that the scheduler will treat them as remote.
    */
-  private void synthesizeBlockMetadata(FileSystem fs, FileDescriptor fd,
+  private void synthesizeBlockMetadata(FileSystem fs, Path dirPath, HashMap<Path,
+      List<HdfsPartition>> partsByPath) throws IOException {
+    RemoteIterator<LocatedFileStatus> fileStatusIter = fs.listFiles(dirPath, true);
+    while (fileStatusIter.hasNext()) {
+      LocatedFileStatus fileStatus = fileStatusIter.next();
+      if (!FileSystemUtil.isValidDataFile(fileStatus)) continue;
+      Path partPathDir = fileStatus.getPath().getParent();
+      Preconditions.checkNotNull(partPathDir);
+      List<HdfsPartition> partitions = partsByPath.get(partPathDir);
+      // Skip if this file does not belong to any known partition.
+      if (partitions == null) {
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("File " + fileStatus.getPath().toString() + " doesn't correspond " +
+              " to a known partition. Skipping metadata load for this file.");
+        }
+        continue;
+      }
+      String fileName = fileStatus.getPath().getName();
+      FileDescriptor fd = new FileDescriptor(fileName, fileStatus.getLen(),
+          fileStatus.getModificationTime());
+      Preconditions.checkState(partitions.size() > 0);
+      // For the purpose of synthesizing block metadata, we assume that all partitions
+      // with the same location have the same file format.
+      HdfsFileFormat fileFormat = partitions.get(0).getFileFormat();
+      synthesizeFdBlockMetadata(fs, fd, fileFormat);
+      // Update the partitions' metadata that this file belongs to.
+      for (HdfsPartition partition: partitions) {
+        partition.getFileDescriptors().add(fd);
+        numHdfsFiles_++;
+        totalHdfsBytes_ += fd.getFileLength();
+      }
+    }
+  }
+
+  /**
+   * Helper method to synthesize block metadata for file descriptor fd.
+   */
+  private void synthesizeFdBlockMetadata(FileSystem fs, FileDescriptor fd,
       HdfsFileFormat fileFormat) {
     long start = 0;
     long remaining = fd.getFileLength();
@@ -449,72 +477,6 @@ public class HdfsTable extends Table {
     }
   }
 
-  /**
-   * Populates disk/volume ID metadata inside the newly created THdfsFileBlocks.
-   * perFsFileBlocks maps from each filesystem to a FileBLocksInfo.  The first list
-   * contains the newly created THdfsFileBlocks and the second contains the
-   * corresponding BlockLocations.
-   */
-  private void loadDiskIds(Map<FsKey, FileBlocksInfo> perFsFileBlocks) {
-    if (!SUPPORTS_VOLUME_ID) return;
-    // Loop over each filesystem.  If the filesystem is DFS, retrieve the volume IDs
-    // for all the blocks.
-    for (FsKey fsKey: perFsFileBlocks.keySet()) {
-      FileSystem fs = fsKey.filesystem;
-      // Only DistributedFileSystem has getFileBlockStorageLocations().  It's not even
-      // part of the FileSystem interface, so we'll need to downcast.
-      if (!(fs instanceof DistributedFileSystem)) continue;
-
-      if (LOG.isTraceEnabled()) {
-        LOG.trace("Loading disk ids for: " + getFullName() + ". nodes: " +
-            hostIndex_.size() + ". filesystem: " + fsKey);
-      }
-      DistributedFileSystem dfs = (DistributedFileSystem)fs;
-      FileBlocksInfo blockLists = perFsFileBlocks.get(fsKey);
-      Preconditions.checkNotNull(blockLists);
-      BlockStorageLocation[] storageLocs = null;
-      try {
-        // Get the BlockStorageLocations for all the blocks
-        storageLocs = dfs.getFileBlockStorageLocations(blockLists.locations);
-      } catch (IOException e) {
-        LOG.error("Couldn't determine block storage locations for filesystem " +
-            fs + ":\n" + e.getMessage());
-        continue;
-      }
-      if (storageLocs == null || storageLocs.length == 0) {
-        LOG.warn("Attempted to get block locations for filesystem " + fs +
-            " but the call returned no results");
-        continue;
-      }
-      if (storageLocs.length != blockLists.locations.size()) {
-        // Block locations and storage locations didn't match up.
-        LOG.error("Number of block storage locations not equal to number of blocks: "
-            + "#storage locations=" + Long.toString(storageLocs.length)
-            + " #blocks=" + Long.toString(blockLists.locations.size()));
-        continue;
-      }
-      long unknownDiskIdCount = 0;
-      // Attach volume IDs given by the storage location to the corresponding
-      // THdfsFileBlocks.
-      for (int locIdx = 0; locIdx < storageLocs.length; ++locIdx) {
-        VolumeId[] volumeIds = storageLocs[locIdx].getVolumeIds();
-        THdfsFileBlock block = blockLists.blocks.get(locIdx);
-        // Convert opaque VolumeId to 0 based ids.
-        // TODO: the diskId should be eventually retrievable from Hdfs when the
-        // community agrees this API is useful.
-        int[] diskIds = new int[volumeIds.length];
-        for (int i = 0; i < volumeIds.length; ++i) {
-          diskIds[i] = getDiskId(volumeIds[i]);
-          if (diskIds[i] < 0) ++unknownDiskIdCount;
-        }
-        FileBlock.setDiskIds(diskIds, block);
-      }
-      if (unknownDiskIdCount > 0) {
-        LOG.warn("Unknown disk id count for filesystem " + fs + ":" + unknownDiskIdCount);
-      }
-    }
-  }
-
   @Override
   public TCatalogObjectType getCatalogObjectType() {
     return TCatalogObjectType.TABLE;
@@ -700,7 +662,6 @@ public class HdfsTable extends Table {
     nameToPartitionMap_.clear();
     partitionValuesMap_.clear();
     nullPartitionIds_.clear();
-    perPartitionFileDescMap_.clear();
     // Initialize partitionValuesMap_ and nullPartitionIds_. Also reset column stats.
     for (int i = 0; i < numClusteringCols_; ++i) {
       getColumns().get(i).getStats().setNumNulls(0);
@@ -734,7 +695,9 @@ public class HdfsTable extends Table {
   /**
    * Create HdfsPartition objects corresponding to 'msPartitions' and add them to this
    * table's partition list. Any partition metadata will be reset and loaded from
-   * scratch.
+   * scratch. For each partition created, we load the block metadata for each data file
+   * under it. We optimize the block metadata loading by grouping together the name node
+   * requests for all the partitions under the table base directory into a single RPC.
    *
    * If there are no partitions in the Hive metadata, a single partition is added with no
    * partition keys.
@@ -745,29 +708,37 @@ public class HdfsTable extends Table {
       CatalogException {
     Preconditions.checkNotNull(msTbl);
     initializePartitionMetadata(msTbl);
-    // Map of filesystem to the file blocks for new/modified FileDescriptors. Blocks in
-    // this map will have their disk volume IDs information (re)loaded. This is used to
-    // speed up the incremental refresh of a table's metadata by skipping unmodified,
-    // previously loaded blocks.
-    Map<FsKey, FileBlocksInfo> blocksToLoad = Maps.newHashMap();
+    // Map of partition paths to their corresponding HdfsPartition objects. Populated
+    // using createPartition() calls. A single partition path can correspond to multiple
+    // partitions.
+    HashMap<Path, List<HdfsPartition>> partsByPath = Maps.newHashMap();
+    Path tblLocation = getHdfsBaseDirPath();
+    // List of directories that we scan for block locations. We optimize the block metadata
+    // loading to reduce the number of RPCs to the NN by separately loading partitions
+    // with default directory paths (under the base table directory) and non-default
+    // directory paths. For the former we issue a single RPC to the NN to load all the
+    // blocks from hdfsBaseDir_ and for the latter we load each of the partition directory
+    // separately.
+    // TODO: We can still do some advanced optimization by grouping all the partition
+    // directories under the same ancestor path up the tree.
+    List<Path> dirsToLoad = Lists.newArrayList(tblLocation);
+    FileSystem fs = tblLocation.getFileSystem(CONF);
     if (msTbl.getPartitionKeysSize() == 0) {
       Preconditions.checkArgument(msPartitions == null || msPartitions.isEmpty());
       // This table has no partition key, which means it has no declared partitions.
       // We model partitions slightly differently to Hive - every file must exist in a
       // partition, so add a single partition with no keys which will get all the
       // files in the table's root directory.
-      HdfsPartition part = createPartition(msTbl.getSd(), null, blocksToLoad);
+      HdfsPartition part = createPartition(msTbl.getSd(), null);
+      partsByPath.put(tblLocation, Lists.newArrayList(part));
       if (isMarkedCached_) part.markCached();
       addPartition(part);
-      Path location = new Path(hdfsBaseDir_);
-      FileSystem fs = location.getFileSystem(CONF);
-      if (fs.exists(location)) {
-        accessLevel_ = getAvailableAccessLevel(fs, location);
+      if (fs.exists(tblLocation)) {
+        accessLevel_ = getAvailableAccessLevel(fs, tblLocation);
       }
     } else {
       for (org.apache.hadoop.hive.metastore.api.Partition msPartition: msPartitions) {
-        HdfsPartition partition = createPartition(msPartition.getSd(), msPartition,
-            blocksToLoad);
+        HdfsPartition partition = createPartition(msPartition.getSd(), msPartition);
         addPartition(partition);
         // If the partition is null, its HDFS path does not exist, and it was not added
         // to this table's partition list. Skip the partition.
@@ -784,11 +755,41 @@ public class HdfsTable extends Table {
           // WRITE_ONLY the table's access level should be NONE.
           accessLevel_ = TAccessLevel.READ_ONLY;
         }
+        Path partDir = new Path(msPartition.getSd().getLocation());
+        List<HdfsPartition> parts = partsByPath.get(partDir);
+        if (parts == null) {
+          partsByPath.put(partDir, Lists.newArrayList(partition));
+        } else {
+          parts.add(partition);
+        }
+        if (!dirsToLoad.contains(partDir) &&
+            !FileSystemUtil.isDescendantPath(partDir, tblLocation)) {
+          // This partition has a custom filesystem location. Load its file/block
+          // metadata separately by adding it to the list of dirs to load.
+          dirsToLoad.add(partDir);
+        }
       }
     }
-    loadDiskIds(blocksToLoad);
+    if (LOG.isTraceEnabled()) LOG.trace("partsByPath size: " + partsByPath.size());
+    loadMetadataAndDiskIds(dirsToLoad, partsByPath);
   }
 
+  private void loadMetadataAndDiskIds(HdfsPartition partition) throws CatalogException {
+      Path partDirPath = partition.getLocationPath();
+      HashMap<Path, List<HdfsPartition>> partsByPath = Maps.newHashMap();
+      partsByPath.put(partDirPath, Lists.newArrayList(partition));
+      loadMetadataAndDiskIds(Lists.newArrayList(partDirPath), partsByPath);
+  }
+
+  /**
+   * Helper method to load the block locations from each directory in 'locations'
+   * and filtering only the paths from 'partsByPath'. Also loads the disk IDs
+   * corresponding to these block locations.
+   */
+  private void loadMetadataAndDiskIds(List<Path> locations,
+      HashMap<Path, List<HdfsPartition>> partsByPath) {
+    for (Path location: locations) { loadBlockMetadata(location, partsByPath); }
+  }
   /**
    * Gets the AccessLevel that is available for Impala for this table based on the
    * permissions Impala has on the given path. If the path does not exist, recurses up
@@ -828,31 +829,25 @@ public class HdfsTable extends Table {
    * Throws CatalogException if the supplied storage descriptor contains metadata that
    * Impala can't understand.
    */
-  public HdfsPartition createPartition(StorageDescriptor storageDescriptor,
+  public HdfsPartition createAndLoadPartition(StorageDescriptor storageDescriptor,
       org.apache.hadoop.hive.metastore.api.Partition msPartition)
       throws CatalogException {
-    Map<FsKey, FileBlocksInfo> blocksToLoad = Maps.newHashMap();
-    HdfsPartition hdfsPartition = createPartition(storageDescriptor, msPartition,
-        blocksToLoad);
-    loadDiskIds(blocksToLoad);
+    HdfsPartition hdfsPartition = createPartition(storageDescriptor, msPartition);
+    loadMetadataAndDiskIds(hdfsPartition);
     return hdfsPartition;
   }
 
   /**
    * Creates a new HdfsPartition from a specified StorageDescriptor and an HMS partition
-   * object. It populates 'perFsFileBlock' with the blocks to be loaded for each file in
-   * the partition directory.
+   * object.
    */
   private HdfsPartition createPartition(StorageDescriptor storageDescriptor,
-      org.apache.hadoop.hive.metastore.api.Partition msPartition,
-      Map<FsKey, FileBlocksInfo> perFsFileBlocks)
+      org.apache.hadoop.hive.metastore.api.Partition msPartition)
       throws CatalogException {
     HdfsStorageDescriptor fileFormatDescriptor =
         HdfsStorageDescriptor.fromStorageDescriptor(this.name_, storageDescriptor);
     List<LiteralExpr> keyValues = Lists.newArrayList();
-    boolean isMarkedCached = isMarkedCached_;
     if (msPartition != null) {
-      isMarkedCached = HdfsCachingUtil.validateCacheParams(msPartition.getParameters());
       // Load key values
       for (String partitionKey: msPartition.getValues()) {
         Type type = getColumns().get(keyValues.size()).getType();
@@ -869,12 +864,7 @@ public class HdfsTable extends Table {
           }
         }
       }
-      try {
-        Expr.analyze(keyValues, null);
-      } catch (AnalysisException e) {
-        // should never happen
-        throw new IllegalStateException(e);
-      }
+      for (Expr v: keyValues) v.analyzeNoThrow(null);
     }
 
     Path partDirPath = new Path(storageDescriptor.getLocation());
@@ -882,12 +872,12 @@ public class HdfsTable extends Table {
       FileSystem fs = partDirPath.getFileSystem(CONF);
       multipleFileSystems_ = multipleFileSystems_ ||
           !FileSystemUtil.isPathOnFileSystem(new Path(getLocation()), fs);
-      updatePartitionFds(partDirPath, isMarkedCached,
-          fileFormatDescriptor.getFileFormat(), perFsFileBlocks);
+      if (msPartition != null) {
+        HdfsCachingUtil.validateCacheParams(msPartition.getParameters());
+      }
       HdfsPartition partition =
           new HdfsPartition(this, msPartition, keyValues, fileFormatDescriptor,
-              perPartitionFileDescMap_.get(partDirPath.toString()).values(),
-              getAvailableAccessLevel(fs, partDirPath));
+          new ArrayList<FileDescriptor>(), getAvailableAccessLevel(fs, partDirPath));
       partition.checkWellFormed();
       return partition;
     } catch (IOException e) {
@@ -896,21 +886,6 @@ public class HdfsTable extends Table {
   }
 
   /**
-   * Add the given THdfsFileBlocks and BlockLocations to the FileBlockInfo for the
-   * given filesystem.
-   */
-  private void addPerFsFileBlocks(Map<FsKey, FileBlocksInfo> fsToBlocks, FileSystem fs,
-      List<THdfsFileBlock> blocks, List<BlockLocation> locations) {
-    FsKey fsKey = new FsKey(fs);
-    FileBlocksInfo infos = fsToBlocks.get(fsKey);
-    if (infos == null) {
-      infos = new FileBlocksInfo();
-      fsToBlocks.put(fsKey, infos);
-    }
-    infos.addBlocks(blocks, locations);
-  }
-
-  /**
    * Adds the partition to the HdfsTable. Throws a CatalogException if the partition
    * already exists in this table.
    */
@@ -983,7 +958,6 @@ public class HdfsTable extends Table {
     partitionIds_.remove(partitionId);
     partitionMap_.remove(partitionId);
     nameToPartitionMap_.remove(partition.getPartitionName());
-    perPartitionFileDescMap_.remove(partition.getLocation());
     for (int i = 0; i < partition.getPartitionValues().size(); ++i) {
       ColumnStats stats = getColumns().get(i).getStats();
       LiteralExpr literal = partition.getPartitionValues().get(i);
@@ -1055,8 +1029,7 @@ public class HdfsTable extends Table {
    * metadata will be updated from the Hive Metastore.
    *
    * If 'loadFileMetadata' is true, file metadata of the specified partitions are
-   * reloaded while reusing existing file descriptors to avoid loading metadata for files
-   * that haven't changed. If 'partitionsToUpdate' is not specified, file metadata of all
+   * reloaded from scratch. If 'partitionsToUpdate' is not specified, file metadata of all
    * the partitions are loaded.
    *
    * If 'loadTableSchema' is true, the table schema is loaded from the Hive Metastore.
@@ -1148,10 +1121,9 @@ public class HdfsTable extends Table {
     org.apache.hadoop.hive.metastore.api.Table msTbl = getMetaStoreTable();
     Preconditions.checkNotNull(msTbl);
     addDefaultPartition(msTbl.getSd());
-    Map<FsKey, FileBlocksInfo> fileBlocksToLoad = Maps.newHashMap();
-    HdfsPartition part = createPartition(msTbl.getSd(), null, fileBlocksToLoad);
+    HdfsPartition part = createPartition(msTbl.getSd(), null);
     addPartition(part);
-    loadDiskIds(fileBlocksToLoad);
+    loadMetadataAndDiskIds(part);
     if (isMarkedCached_) part.markCached();
   }
 
@@ -1452,10 +1424,8 @@ public class HdfsTable extends Table {
     msPartitions.addAll(MetaStoreUtil.fetchPartitionsByName(client,
         Lists.newArrayList(partitionNames), db_.getName(), name_));
 
-    Map<FsKey, FileBlocksInfo> fileBlocksToLoad = Maps.newHashMap();
     for (org.apache.hadoop.hive.metastore.api.Partition msPartition: msPartitions) {
-      HdfsPartition partition =
-          createPartition(msPartition.getSd(), msPartition, fileBlocksToLoad);
+      HdfsPartition partition = createPartition(msPartition.getSd(), msPartition);
       addPartition(partition);
       // If the partition is null, its HDFS path does not exist, and it was not added to
       // this table's partition list. Skip the partition.
@@ -1473,8 +1443,8 @@ public class HdfsTable extends Table {
         // WRITE_ONLY the table's access level should be NONE.
         accessLevel_ = TAccessLevel.READ_ONLY;
       }
+      loadMetadataAndDiskIds(partition);
     }
-    loadDiskIds(fileBlocksToLoad);
   }
 
   /**
@@ -1491,10 +1461,12 @@ public class HdfsTable extends Table {
     Preconditions.checkNotNull(msTbl);
     HdfsStorageDescriptor fileFormatDescriptor =
         HdfsStorageDescriptor.fromStorageDescriptor(this.name_, msTbl.getSd());
-    Map<FsKey, FileBlocksInfo> perFsFileBlocks = Maps.newHashMap();
     for (HdfsPartition partition: partitions) {
       org.apache.hadoop.hive.metastore.api.Partition msPart =
           partition.toHmsPartition();
+      if (msPart != null) {
+        HdfsCachingUtil.validateCacheParams(msPart.getParameters());
+      }
       StorageDescriptor sd = null;
       if (msPart == null) {
         // If this partition is not stored in the Hive Metastore (e.g. default partition
@@ -1504,10 +1476,8 @@ public class HdfsTable extends Table {
       } else {
         sd = msPart.getSd();
       }
-      loadPartitionFileMetadata(sd, partition, fileFormatDescriptor.getFileFormat(),
-          perFsFileBlocks);
+      loadPartitionFileMetadata(sd, partition);
     }
-    loadDiskIds(perFsFileBlocks);
   }
 
   /**
@@ -1517,82 +1487,19 @@ public class HdfsTable extends Table {
    * 'perFsFileBlocks' with file block info and updates table metadata.
    */
   private void loadPartitionFileMetadata(StorageDescriptor storageDescriptor,
-      HdfsPartition partition, HdfsFileFormat fileFormat,
-      Map<FsKey, FileBlocksInfo> perFsFileBlocks) throws Exception {
+      HdfsPartition partition) throws Exception {
     Preconditions.checkNotNull(storageDescriptor);
     Preconditions.checkNotNull(partition);
     org.apache.hadoop.hive.metastore.api.Partition msPart =
         partition.toHmsPartition();
-    boolean isMarkedCached = isMarkedCached_;
-    if (msPart != null) {
-      isMarkedCached = HdfsCachingUtil.validateCacheParams(msPart.getParameters());
-    }
     Path partDirPath = new Path(storageDescriptor.getLocation());
     FileSystem fs = partDirPath.getFileSystem(CONF);
     if (!fs.exists(partDirPath)) return;
 
-    String partitionDir = partDirPath.toString();
     numHdfsFiles_ -= partition.getNumFileDescriptors();
     totalHdfsBytes_ -= partition.getSize();
     Preconditions.checkState(numHdfsFiles_ >= 0 && totalHdfsBytes_ >= 0);
-    updatePartitionFds(partDirPath, isMarkedCached, fileFormat, perFsFileBlocks);
-    List<FileDescriptor> fileDescs = Lists.newArrayList(
-        perPartitionFileDescMap_.get(partDirPath.toString()).values());
-    partition.setFileDescriptors(fileDescs);
-    totalHdfsBytes_ += partition.getSize();
-    numHdfsFiles_ += fileDescs.size();
-  }
-
-  /**
-   * Updates the file descriptors of a partition directory specified by 'partitionPath'
-   * and loads block metadata of new/modified files. Reuses existing FileDescriptors for
-   * unchanged files (indicated by unchanged mtime). The one exception is if the
-   * partition is marked as cached (HDFS caching) in which case the block metadata
-   * cannot be reused. Otherwise, creates new FileDescriptors and adds them to
-   * perPartitionFileDescMap_. 'fileFomat' is the file format of the files in this
-   * partition directory. 'perFsFileBlocks' is populated with the loaded block metadata.
-   */
-  private void updatePartitionFds(Path partitionPath,
-      boolean isMarkedCached, HdfsFileFormat fileFormat,
-      Map<FsKey, FileBlocksInfo> perFsFileBlocks) throws CatalogException {
-    Preconditions.checkNotNull(partitionPath);
-    String partPathStr = partitionPath.toString();
-    try {
-      FileSystem fs = partitionPath.getFileSystem(CONF);
-      if (!fs.exists(partitionPath)) {
-        perPartitionFileDescMap_.put(
-            partPathStr, Maps.<String, FileDescriptor>newHashMap());
-        return;
-      }
-      Map<String, FileDescriptor> fileDescMap =
-          perPartitionFileDescMap_.get(partPathStr);
-      Map<String, FileDescriptor> newFileDescMap = Maps.newHashMap();
-      // Get all the files in the partition directory
-      for (FileStatus fileStatus: fs.listStatus(partitionPath)) {
-        String fileName = fileStatus.getPath().getName().toString();
-        if (fileStatus.isDirectory() || FileSystemUtil.isHiddenFile(fileName) ||
-          HdfsCompression.fromFileName(fileName) == HdfsCompression.LZO_INDEX) {
-          // Ignore directory, hidden file starting with . or _, and LZO index files
-          // If a directory is erroneously created as a subdirectory of a partition dir
-          // we should ignore it and move on. Hive will not recurse into directories.
-          // Skip index files, these are read by the LZO scanner directly.
-          continue;
-        }
-        FileDescriptor fd = fileDescMap != null ? fileDescMap.get(fileName) : null;
-        if (fd == null || isMarkedCached || fd.getFileLength() != fileStatus.getLen()
-          || fd.getModificationTime() != fileStatus.getModificationTime()) {
-          // Metadata of cached or modified files are not reused.
-          fd = new FileDescriptor(fileName, fileStatus.getLen(),
-              fileStatus.getModificationTime());
-          loadBlockMetadata(fs, fileStatus, fd, fileFormat, perFsFileBlocks);
-        }
-        newFileDescMap.put(fileName, fd);
-      }
-      perPartitionFileDescMap_.put(partPathStr, newFileDescMap);
-    } catch (Exception e) {
-      throw new CatalogException("Failed to retrieve file descriptors from path " +
-        partitionPath, e);
-    }
+    loadMetadataAndDiskIds(partition);
   }
 
   @Override
@@ -1697,6 +1604,7 @@ public class HdfsTable extends Table {
   public long getNumHdfsFiles() { return numHdfsFiles_; }
   public long getTotalHdfsBytes() { return totalHdfsBytes_; }
   public String getHdfsBaseDir() { return hdfsBaseDir_; }
+  public Path getHdfsBaseDirPath() { return new Path(hdfsBaseDir_); }
   public boolean isAvroTable() { return avroSchema_ != null; }
 
   /**
@@ -2022,7 +1930,7 @@ public class HdfsTable extends Table {
    */
   public void reloadPartition(HdfsPartition oldPartition, Partition hmsPartition)
       throws CatalogException {
-    HdfsPartition refreshedPartition = createPartition(
+    HdfsPartition refreshedPartition = createAndLoadPartition(
         hmsPartition.getSd(), hmsPartition);
     Preconditions.checkArgument(oldPartition == null
         || oldPartition.compareTo(refreshedPartition) == 0);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6662c553/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java b/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java
index 71eea88..4767837 100644
--- a/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java
+++ b/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.fs.s3a.S3AFileSystem;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.client.HdfsAdmin;
 import org.apache.hadoop.hdfs.protocol.EncryptionZone;
+import org.apache.impala.catalog.HdfsCompression;
 import org.apache.log4j.Logger;
 
 import com.google.common.base.Preconditions;
@@ -280,12 +281,25 @@ public class FileSystemUtil {
   }
 
   /**
-   * Returns true if the filesystem might override getFileBlockLocations().
+   * Returns true if the file corresponding to 'fileStatus' is a valid data file as
+   * per Impala's partitioning rules. A fileStatus is considered invalid if its a
+   * directory/hidden file/LZO index file. LZO index files are skipped because they are
+   * read by the scanner directly. Currently Impala doesn't allow subdirectories in the
+   * partition paths.
    */
-  public static boolean hasGetFileBlockLocations(FileSystem fs) {
+  public static boolean isValidDataFile(FileStatus fileStatus) {
+    String fileName = fileStatus.getPath().getName();
+    return !(fileStatus.isDirectory() || FileSystemUtil.isHiddenFile(fileName) ||
+        HdfsCompression.fromFileName(fileName) == HdfsCompression.LZO_INDEX);
+  }
+
+  /**
+   * Returns true if the filesystem supports storage UUIDs in BlockLocation calls.
+   */
+  public static boolean supportsStorageIds(FileSystem fs) {
     // Common case.
     if (isDistributedFileSystem(fs)) return true;
-    // Blacklist FileSystems that are known to not implement getFileBlockLocations().
+    // Blacklist FileSystems that are known to not to include storage UUIDs.
     return !(fs instanceof S3AFileSystem || fs instanceof LocalFileSystem);
   }
 
@@ -409,6 +423,16 @@ public class FileSystemUtil {
   }
 
   /**
+   * Returns true if Path 'p' is a descendant of Path 'parent', false otherwise.
+   */
+  public static boolean isDescendantPath(Path p, Path parent) {
+    if (p == null || parent == null) return false;
+    while (!p.isRoot() && p.depth() != parent.depth()) p = p.getParent();
+    if (p.isRoot()) return false;
+    return p.equals(parent);
+  }
+
+  /**
    * Returns the configuration.
    */
   public static Configuration getConfiguration() {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6662c553/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index 4087818..f878b12 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -602,7 +602,8 @@ public class CatalogOpExecutor {
       throw new CatalogException("Table " + tbl.getFullName() + " is not an HDFS table");
     }
     HdfsTable hdfsTable = (HdfsTable) tbl;
-    HdfsPartition hdfsPartition = hdfsTable.createPartition(partition.getSd(), partition);
+    HdfsPartition hdfsPartition =
+        hdfsTable.createAndLoadPartition(partition.getSd(), partition);
     return catalog_.addPartition(hdfsPartition);
   }
 


[2/7] incubator-impala git commit: IMPALA-4572: Run COMPUTE STATS on Parquet tables with MT_DOP=4.

Posted by sa...@apache.org.
IMPALA-4572: Run COMPUTE STATS on Parquet tables with MT_DOP=4.

COMPUTE STATS on Parquet tables is run with MT_DOP=4 by default.
COMPUTE STATS on non-Parquet tables will run without MT_DOP.

Users can always override the behavior by setting MT_DOP manually.
Setting MT_DOP to 0 means a statement will be run in the
conventional execution mode (without intra-node paralellism based
on multiple fragment instances). Users can set a higher MT_DOP
even for Parquet tables.

Testing: Added a new test that checks the effective MT_DOP.
Locally ran test_mt_dop.py, test_scanners.py, test_nested_types.py,
test_compute_stats.py, and test_cancellation.py.

Change-Id: I2be3c7c9f3004e9a759224a2e5756eb6e4efa359
Reviewed-on: http://gerrit.cloudera.org:8080/5315
Reviewed-by: Alex Behm <al...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/7efa0831
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/7efa0831
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/7efa0831

Branch: refs/heads/master
Commit: 7efa08316ecb8f73d1c968ed602d11d40c714a1f
Parents: 6662c55
Author: Alex Behm <al...@cloudera.com>
Authored: Thu Dec 1 13:58:19 2016 -0800
Committer: Internal Jenkins <cl...@gerrit.cloudera.org>
Committed: Sat Dec 3 22:28:53 2016 +0000

----------------------------------------------------------------------
 common/thrift/ImpalaInternalService.thrift      |  3 +-
 .../impala/analysis/ComputeStatsStmt.java       | 20 ++++++++
 .../org/apache/impala/planner/HdfsScanNode.java |  3 +-
 .../impala/planner/SingleNodePlanner.java       | 10 ++--
 .../org/apache/impala/service/Frontend.java     | 18 ++++++-
 .../org/apache/impala/planner/PlannerTest.java  | 52 ++++++++++++++++++++
 .../apache/impala/planner/PlannerTestBase.java  |  4 +-
 .../org/apache/impala/service/FrontendTest.java |  4 +-
 tests/query_test/test_mt_dop.py                 |  4 +-
 9 files changed, 107 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7efa0831/common/thrift/ImpalaInternalService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift
index 4f81e27..f18947a 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -191,7 +191,8 @@ struct TQueryOptions {
   // query per backend.
   // > 0: multi-threaded execution mode, with given dop
   // 0: single-threaded execution mode
-  44: optional i32 mt_dop = 0
+  // unset: may be set automatically to > 0 in createExecRequest(), otherwise same as 0
+  44: optional i32 mt_dop
 
   // If true, INSERT writes to S3 go directly to their final location rather than being
   // copied there by the coordinator. We cannot do this for INSERT OVERWRITES because for

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7efa0831/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java b/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java
index 84b866b..90c46a8 100644
--- a/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java
@@ -17,6 +17,7 @@
 
 package org.apache.impala.analysis;
 
+import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 
@@ -24,6 +25,7 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.impala.authorization.Privilege;
 import org.apache.impala.catalog.Column;
 import org.apache.impala.catalog.HBaseTable;
+import org.apache.impala.catalog.HdfsFileFormat;
 import org.apache.impala.catalog.HdfsPartition;
 import org.apache.impala.catalog.HdfsTable;
 import org.apache.impala.catalog.Table;
@@ -528,6 +530,24 @@ public class ComputeStatsStmt extends StatementBase {
   public String getTblStatsQuery() { return tableStatsQueryStr_; }
   public String getColStatsQuery() { return columnStatsQueryStr_; }
 
+  /**
+   * Returns true if this statement computes stats on Parquet partitions only,
+   * false otherwise.
+   */
+  public boolean isParquetOnly() {
+    if (!(table_ instanceof HdfsTable)) return false;
+    Collection<HdfsPartition> affectedPartitions = null;
+    if (partitionSet_ != null) {
+      affectedPartitions = partitionSet_.getPartitions();
+    } else {
+      affectedPartitions = ((HdfsTable) table_).getPartitions();
+    }
+    for (HdfsPartition partition: affectedPartitions) {
+      if (partition.getFileFormat() != HdfsFileFormat.PARQUET) return false;
+    }
+    return true;
+  }
+
   @Override
   public String toSql() {
     if (!isIncremental_) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7efa0831/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
index 0aee399..9642b97 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
@@ -173,7 +173,8 @@ public class HdfsScanNode extends ScanNode {
 
     // Determine backend scan node implementation to use. The optimized MT implementation
     // is currently only supported for Parquet.
-    if (analyzer.getQueryOptions().mt_dop > 0 &&
+    if (analyzer.getQueryOptions().isSetMt_dop() &&
+        analyzer.getQueryOptions().mt_dop > 0 &&
         fileFormats.size() == 1 && fileFormats.contains(HdfsFileFormat.PARQUET)) {
       useMtScanNode_ = true;
     } else {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7efa0831/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java b/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java
index 1634dd2..4bc8a88 100644
--- a/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java
+++ b/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java
@@ -158,14 +158,18 @@ public class SingleNodePlanner {
    * Throws a NotImplementedException if plan validation fails.
    */
   public void validatePlan(PlanNode planNode) throws NotImplementedException {
-    if (ctx_.getQueryOptions().mt_dop > 0 && !RuntimeEnv.INSTANCE.isTestEnv()
+    if (ctx_.getQueryOptions().isSetMt_dop() && ctx_.getQueryOptions().mt_dop > 0
+        && !RuntimeEnv.INSTANCE.isTestEnv()
         && (planNode instanceof JoinNode || ctx_.hasTableSink())) {
       throw new NotImplementedException(
           "MT_DOP not supported for plans with base table joins or table sinks.");
     }
 
-    // As long as MT_DOP == 0 any join can run in a single-node plan.
-    if (ctx_.isSingleNodeExec() && ctx_.getQueryOptions().mt_dop == 0) return;
+    // As long as MT_DOP is unset or 0 any join can run in a single-node plan.
+    if (ctx_.isSingleNodeExec() &&
+        (!ctx_.getQueryOptions().isSetMt_dop() || ctx_.getQueryOptions().mt_dop == 0)) {
+      return;
+    }
 
     if (planNode instanceof NestedLoopJoinNode) {
       JoinNode joinNode = (JoinNode) planNode;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7efa0831/fe/src/main/java/org/apache/impala/service/Frontend.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java
index e7eabb1..c98ba49 100644
--- a/fe/src/main/java/org/apache/impala/service/Frontend.java
+++ b/fe/src/main/java/org/apache/impala/service/Frontend.java
@@ -114,6 +114,7 @@ import org.apache.impala.thrift.TPlanExecInfo;
 import org.apache.impala.thrift.TPlanFragment;
 import org.apache.impala.thrift.TQueryCtx;
 import org.apache.impala.thrift.TQueryExecRequest;
+import org.apache.impala.thrift.TQueryOptions;
 import org.apache.impala.thrift.TResetMetadataRequest;
 import org.apache.impala.thrift.TResultRow;
 import org.apache.impala.thrift.TResultSet;
@@ -978,8 +979,9 @@ public class Frontend {
       Planner planner, StringBuilder explainString) throws ImpalaException {
     TQueryCtx queryCtx = planner.getQueryCtx();
     AnalysisContext.AnalysisResult analysisResult = planner.getAnalysisResult();
-    boolean isMtExec =
-        analysisResult.isQueryStmt() && queryCtx.request.query_options.mt_dop > 0;
+    boolean isMtExec = analysisResult.isQueryStmt() &&
+        queryCtx.request.query_options.isSetMt_dop() &&
+        queryCtx.request.query_options.mt_dop > 0;
 
     List<PlanFragment> planRoots = Lists.newArrayList();
     TQueryExecRequest result = new TQueryExecRequest();
@@ -1038,6 +1040,7 @@ public class Frontend {
     result.setAccess_events(analysisResult.getAccessEvents());
     result.analysis_warnings = analysisResult.getAnalyzer().getWarnings();
 
+    TQueryOptions queryOptions = queryCtx.request.query_options;
     if (analysisResult.isCatalogOp()) {
       result.stmt_type = TStmtType.DDL;
       createCatalogOpRequest(analysisResult, result);
@@ -1045,6 +1048,15 @@ public class Frontend {
       if (thriftLineageGraph != null && thriftLineageGraph.isSetQuery_text()) {
         result.catalog_op_request.setLineage_graph(thriftLineageGraph);
       }
+      // Set MT_DOP=4 for COMPUTE STATS on Parquet tables, unless the user has already
+      // provided another value for MT_DOP.
+      if (!queryOptions.isSetMt_dop() &&
+          analysisResult.isComputeStatsStmt() &&
+          analysisResult.getComputeStatsStmt().isParquetOnly()) {
+        queryOptions.setMt_dop(4);
+      }
+      // If unset, set MT_DOP to 0 to simplify the rest of the code.
+      if (!queryOptions.isSetMt_dop()) queryOptions.setMt_dop(0);
       // All DDL operations except for CTAS are done with analysis at this point.
       if (!analysisResult.isCreateTableAsSelectStmt()) return result;
     } else if (analysisResult.isLoadDataStmt()) {
@@ -1061,6 +1073,8 @@ public class Frontend {
       result.setSet_query_option_request(analysisResult.getSetStmt().toThrift());
       return result;
     }
+    // If unset, set MT_DOP to 0 to simplify the rest of the code.
+    if (!queryOptions.isSetMt_dop()) queryOptions.setMt_dop(0);
 
     // create TQueryExecRequest
     Preconditions.checkState(analysisResult.isQueryStmt() || analysisResult.isDmlStmt()

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7efa0831/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
index dce32a6..8c48ee4 100644
--- a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
+++ b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
@@ -17,14 +17,22 @@
 
 package org.apache.impala.planner;
 
+import org.apache.impala.catalog.Catalog;
 import org.apache.impala.catalog.Db;
+import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.RuntimeEnv;
+import org.apache.impala.testutil.TestUtils;
+import org.apache.impala.thrift.TExecRequest;
 import org.apache.impala.thrift.TExplainLevel;
+import org.apache.impala.thrift.TQueryCtx;
 import org.apache.impala.thrift.TQueryOptions;
 import org.apache.impala.thrift.TRuntimeFilterMode;
+import org.junit.Assert;
 import org.junit.Assume;
 import org.junit.Test;
 
+import jline.internal.Preconditions;
+
 // All planner tests, except for S3 specific tests should go here.
 public class PlannerTest extends PlannerTestBase {
 
@@ -308,4 +316,48 @@ public class PlannerTest extends PlannerTestBase {
       RuntimeEnv.INSTANCE.setTestEnv(true);
     }
   }
+
+  @Test
+  public void testComputeStatsMtDop() {
+    for (int mtDop: new int[] {-1, 0, 1, 16}) {
+      int effectiveMtDop = (mtDop != -1) ? mtDop : 0;
+      // MT_DOP is not set automatically for stmt other than COMPUTE STATS.
+      testEffectiveMtDop(
+          "select * from functional_parquet.alltypes", mtDop, effectiveMtDop);
+      // MT_DOP is not set automatically for COMPUTE STATS on non-Parquet tables.
+      testEffectiveMtDop(
+          "compute stats functional.alltypes", mtDop, effectiveMtDop);
+    }
+    // MT_DOP is set automatically for COMPUTE STATS on Parquet tables,
+    // but can be overridden by a user-provided MT_DOP.
+    testEffectiveMtDop("compute stats functional_parquet.alltypes", -1, 4);
+    testEffectiveMtDop("compute stats functional_parquet.alltypes", 0, 0);
+    testEffectiveMtDop("compute stats functional_parquet.alltypes", 1, 1);
+    testEffectiveMtDop("compute stats functional_parquet.alltypes", 16, 16);
+  }
+
+  /**
+   * Creates an exec request for 'stmt' setting the MT_DOP query option to 'userMtDop',
+   * or leaving it unset if 'userMtDop' is -1. Asserts that the MT_DOP of the generated
+   * exec request is equal to 'expectedMtDop'.
+   */
+  private void testEffectiveMtDop(String stmt, int userMtDop, int expectedMtDop) {
+    TQueryCtx queryCtx = TestUtils.createQueryContext(
+        Catalog.DEFAULT_DB, System.getProperty("user.name"));
+    queryCtx.request.setStmt(stmt);
+    queryCtx.request.query_options = defaultQueryOptions();
+    if (userMtDop != -1) queryCtx.request.query_options.setMt_dop(userMtDop);
+    StringBuilder explainBuilder = new StringBuilder();
+    TExecRequest request = null;
+    try {
+      request = frontend_.createExecRequest(queryCtx, explainBuilder);
+    } catch (ImpalaException e) {
+      Assert.fail("Failed to create exec request for '" + stmt + "': " + e.getMessage());
+    }
+    Preconditions.checkNotNull(request);
+    int actualMtDop = -1;
+    if (request.query_options.isSetMt_dop()) actualMtDop = request.query_options.mt_dop;
+    // Check that the effective MT_DOP is as expected.
+    Assert.assertEquals(actualMtDop, expectedMtDop);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7efa0831/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java b/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
index 4fff233..5e6dbc7 100644
--- a/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
+++ b/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
@@ -364,7 +364,7 @@ public class PlannerTestBase extends FrontendTestBase {
   /**
    * Merge the options of b into a and return a
    */
-  private TQueryOptions mergeQueryOptions(TQueryOptions a, TQueryOptions b) {
+  protected TQueryOptions mergeQueryOptions(TQueryOptions a, TQueryOptions b) {
     for(TQueryOptions._Fields f : TQueryOptions._Fields.values()) {
       if (b.isSet(f)) {
         a.setFieldValue(f, b.getFieldValue(f));
@@ -484,7 +484,7 @@ public class PlannerTestBase extends FrontendTestBase {
           ImpalaInternalServiceConstants.NUM_NODES_ALL);
     }
     if (section == Section.PARALLELPLANS) {
-      queryCtx.request.query_options.mt_dop = 2;
+      queryCtx.request.query_options.setMt_dop(2);
     }
     ArrayList<String> expectedPlan = testCase.getSectionContents(section);
     boolean sectionExists = expectedPlan != null && !expectedPlan.isEmpty();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7efa0831/fe/src/test/java/org/apache/impala/service/FrontendTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/service/FrontendTest.java b/fe/src/test/java/org/apache/impala/service/FrontendTest.java
index dfbdb12..dd6a6c8 100644
--- a/fe/src/test/java/org/apache/impala/service/FrontendTest.java
+++ b/fe/src/test/java/org/apache/impala/service/FrontendTest.java
@@ -32,7 +32,6 @@ import org.apache.hive.service.cli.thrift.TGetInfoReq;
 import org.apache.hive.service.cli.thrift.TGetSchemasReq;
 import org.apache.hive.service.cli.thrift.TGetTablesReq;
 import org.junit.Test;
-
 import org.apache.impala.analysis.AuthorizationTest;
 import org.apache.impala.authorization.AuthorizationConfig;
 import org.apache.impala.catalog.Catalog;
@@ -41,11 +40,14 @@ import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.ImpalaException;
 import org.apache.impala.testutil.ImpaladTestCatalog;
 import org.apache.impala.testutil.TestUtils;
+import org.apache.impala.thrift.TExecRequest;
 import org.apache.impala.thrift.TMetadataOpRequest;
 import org.apache.impala.thrift.TMetadataOpcode;
 import org.apache.impala.thrift.TQueryCtx;
+import org.apache.impala.thrift.TQueryOptions;
 import org.apache.impala.thrift.TResultRow;
 import org.apache.impala.thrift.TResultSet;
+
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7efa0831/tests/query_test/test_mt_dop.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_mt_dop.py b/tests/query_test/test_mt_dop.py
index ff60b60..1d522fd 100644
--- a/tests/query_test/test_mt_dop.py
+++ b/tests/query_test/test_mt_dop.py
@@ -25,7 +25,9 @@ from tests.common.skip import SkipIfOldAggsJoins
 from tests.common.test_vector import TestDimension
 from tests.common.test_vector import TestVector
 
-MT_DOP_VALUES = [1, 2, 8]
+# COMPUTE STATS on Parquet tables automatically sets MT_DOP=4, so include
+# the value 0 to cover the non-MT path as well.
+MT_DOP_VALUES = [0, 1, 2, 8]
 
 class TestMtDop(ImpalaTestSuite):
   @classmethod


[5/7] incubator-impala git commit: IMPALA-4303: Do not reset() qualifier of union operands.

Posted by sa...@apache.org.
IMPALA-4303: Do not reset() qualifier of union operands.

The bug: We used to reset() the qualifier of union operands
to their original value obtained during parsing. This leads to
problems when union operands are unnested and we need to rewrite
Subqueries. In particular, the first union operand of a nested union
was reset() to a null qualifier, but that operand could be somewhere
in the middle of the list of unnested operands in the parent. At that
point, we've lost information about the qualifier of the unnested
operand.

The fix: The simplest solution is to not reset() the qualifier.
The other alternative is be to reset() the qualifier, but also
undo any unnesting. That seems unnecessary and wasteful.

Change-Id: I157bb0f08c4a94fd779487d7c23edd64a537a1f6
Reviewed-on: http://gerrit.cloudera.org:8080/4963
Reviewed-by: Alex Behm <al...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/852e272b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/852e272b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/852e272b

Branch: refs/heads/master
Commit: 852e272b32a5424e1ea31af7b3d532dac0f916b3
Parents: 1da5701
Author: Alex Behm <al...@cloudera.com>
Authored: Fri Nov 4 17:52:47 2016 -0700
Committer: Internal Jenkins <cl...@gerrit.cloudera.org>
Committed: Mon Dec 5 00:58:30 2016 +0000

----------------------------------------------------------------------
 .../org/apache/impala/analysis/UnionStmt.java   | 27 +++++----
 .../queries/PlannerTest/subquery-rewrite.test   | 59 ++++++++++++++++++++
 2 files changed, 74 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/852e272b/fe/src/main/java/org/apache/impala/analysis/UnionStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/UnionStmt.java b/fe/src/main/java/org/apache/impala/analysis/UnionStmt.java
index d5f1ef1..a0e7164 100644
--- a/fe/src/main/java/org/apache/impala/analysis/UnionStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/UnionStmt.java
@@ -52,23 +52,19 @@ public class UnionStmt extends QueryStmt {
   }
 
   /**
-   * Represents an operand to a union, created by the parser.
-   * Contains a query statement and the all/distinct qualifier
-   * of the union operator (null for the first queryStmt).
+   * Represents an operand to a union. It consists of a query statement and its left
+   * all/distinct qualifier (null for the first operand).
    */
   public static class UnionOperand {
-    // Qualifier as seen by the parser. Null for the first operand.
-    private final Qualifier originalQualifier_;
+    // Effective qualifier. Should not be reset() to preserve changes made during
+    // distinct propagation and unnesting that are needed after rewriting Subqueries.
+    private Qualifier qualifier_;
 
     /////////////////////////////////////////
     // BEGIN: Members that need to be reset()
 
     private final QueryStmt queryStmt_;
 
-    // Effective qualifier. Possibly different from parsedQualifier_ due
-    // to DISTINCT propagation.
-    private Qualifier qualifier_;
-
     // Analyzer used for this operand. Set in analyze().
     // We must preserve the conjuncts registered in the analyzer for partition pruning.
     private Analyzer analyzer_;
@@ -81,7 +77,6 @@ public class UnionStmt extends QueryStmt {
 
     public UnionOperand(QueryStmt queryStmt, Qualifier qualifier) {
       queryStmt_ = queryStmt;
-      originalQualifier_ = qualifier;
       qualifier_ = qualifier;
       smap_ = new ExprSubstitutionMap();
     }
@@ -114,7 +109,6 @@ public class UnionStmt extends QueryStmt {
      */
     private UnionOperand(UnionOperand other) {
       queryStmt_ = other.queryStmt_.clone();
-      originalQualifier_ = other.originalQualifier_;
       qualifier_ = other.qualifier_;
       analyzer_ = other.analyzer_;
       smap_ = other.smap_.clone();
@@ -122,7 +116,6 @@ public class UnionStmt extends QueryStmt {
 
     public void reset() {
       queryStmt_.reset();
-      qualifier_ = originalQualifier_;
       analyzer_ = null;
       smap_.clear();
     }
@@ -373,6 +366,9 @@ public class UnionStmt extends QueryStmt {
       unnestOperand(allOperands_, Qualifier.ALL, operands_.get(i));
     }
 
+    for (UnionOperand op: distinctOperands_) op.setQualifier(Qualifier.DISTINCT);
+    for (UnionOperand op: allOperands_) op.setQualifier(Qualifier.ALL);
+
     operands_.clear();
     operands_.addAll(distinctOperands_);
     operands_.addAll(allOperands_);
@@ -608,6 +604,13 @@ public class UnionStmt extends QueryStmt {
   @Override
   public UnionStmt clone() { return new UnionStmt(this); }
 
+  /**
+   * Undoes all changes made by analyze() except distinct propagation and unnesting.
+   * After analysis, operands_ contains the list of unnested operands with qualifiers
+   * adjusted to reflect distinct propagation. Every operand in that list is reset().
+   * The distinctOperands_ and allOperands_ are cleared because they are redundant
+   * with operands_.
+   */
   @Override
   public void reset() {
     super.reset();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/852e272b/testdata/workloads/functional-planner/queries/PlannerTest/subquery-rewrite.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/subquery-rewrite.test b/testdata/workloads/functional-planner/queries/PlannerTest/subquery-rewrite.test
index 85559b6..cf723d2 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/subquery-rewrite.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/subquery-rewrite.test
@@ -2053,3 +2053,62 @@ PLAN-ROOT SINK
 |
 00:EMPTYSET
 ====
+# IMPALA-4303: Test subquery rewriting with nested unions.
+select * from functional.alltypestiny
+where exists (select 1 from functional.alltypes where int_col < 10)
+union all
+  (select * from functional.alltypestiny where year=2009 and month=1
+   union all
+   select * from functional.alltypestiny where year=2009 and month=2)
+---- PLAN
+PLAN-ROOT SINK
+|
+00:UNION
+|
+|--05:SCAN HDFS [functional.alltypestiny]
+|     partitions=1/4 files=1 size=115B
+|
+|--04:SCAN HDFS [functional.alltypestiny]
+|     partitions=1/4 files=1 size=115B
+|
+03:NESTED LOOP JOIN [LEFT SEMI JOIN]
+|
+|--02:SCAN HDFS [functional.alltypes]
+|     partitions=24/24 files=24 size=478.45KB
+|     predicates: int_col < 10
+|     limit: 1
+|
+01:SCAN HDFS [functional.alltypestiny]
+   partitions=4/4 files=4 size=460B
+====
+# IMPALA-4303: Test subquery rewriting with nested unions.
+select * from functional.alltypestiny
+where exists (select 1 from functional.alltypes where int_col < 10)
+union distinct
+  (select * from functional.alltypestiny where year=2009 and month=1
+   union distinct
+   select * from functional.alltypestiny where year=2009 and month=2)
+---- PLAN
+PLAN-ROOT SINK
+|
+06:AGGREGATE [FINALIZE]
+|  group by: id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col, year, month
+|
+00:UNION
+|
+|--05:SCAN HDFS [functional.alltypestiny]
+|     partitions=1/4 files=1 size=115B
+|
+|--04:SCAN HDFS [functional.alltypestiny]
+|     partitions=1/4 files=1 size=115B
+|
+03:NESTED LOOP JOIN [LEFT SEMI JOIN]
+|
+|--02:SCAN HDFS [functional.alltypes]
+|     partitions=24/24 files=24 size=478.45KB
+|     predicates: int_col < 10
+|     limit: 1
+|
+01:SCAN HDFS [functional.alltypestiny]
+   partitions=4/4 files=4 size=460B
+====


[3/7] incubator-impala git commit: IMPALA-3875: Thrift threaded server hang in some cases

Posted by sa...@apache.org.
IMPALA-3875: Thrift threaded server hang in some cases

We use socket timeouts for our backend connections. We set these
timeouts only after we've open()'d the connection, which ideally
should be fine. However, our TSaslTransport stack does read()'s and
write()'s over the network on an open(), which means that on a secure
cluster we send and recieve non-TCP-handshake packets on open(). This
is because the current code tries to establish a SASL handshake during
open().

If for any reason the peer server does not respond to the read()'s
during the open() call (after connect() is successful), the client
will wait on read() indefinitely. This patch sets the socket timeout
before we call open(), so that the read()'s and write()'s during the
open() are subject to the timeout as well.

We should also consider making a larger change where this SASL
handshake does not take place during an open(), but instead after the
open() call is completed, so as to have the open() semantics be the
same for both secure and insecure clusters.

Change-Id: I6c8f91a88f723e0e58e81bb385c5a8f190021868
Reviewed-on: http://gerrit.cloudera.org:8080/5263
Reviewed-by: Sailesh Mukil <sa...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/ff629c2d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/ff629c2d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/ff629c2d

Branch: refs/heads/master
Commit: ff629c2deb97b4ef25e80745bf4689dcbe8407fe
Parents: 7efa083
Author: Sailesh Mukil <sa...@cloudera.com>
Authored: Tue Nov 29 13:12:48 2016 -0800
Committer: Internal Jenkins <cl...@gerrit.cloudera.org>
Committed: Sat Dec 3 23:21:10 2016 +0000

----------------------------------------------------------------------
 be/src/rpc/thrift-client.h           |  2 ++
 be/src/runtime/client-cache.cc       | 15 +++++++++++----
 be/src/statestore/statestore-test.cc |  6 +-----
 3 files changed, 14 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ff629c2d/be/src/rpc/thrift-client.h
----------------------------------------------------------------------
diff --git a/be/src/rpc/thrift-client.h b/be/src/rpc/thrift-client.h
index fae193a..a194551 100644
--- a/be/src/rpc/thrift-client.h
+++ b/be/src/rpc/thrift-client.h
@@ -67,6 +67,8 @@ class ThriftClientImpl {
   /// Set send timeout on the underlying TSocket.
   void setSendTimeout(int32_t ms) { socket_->setSendTimeout(ms); }
 
+  Status socket_create_status() { return socket_create_status_; }
+
  protected:
   ThriftClientImpl(const std::string& ipaddress, int port, bool ssl)
       : address_(MakeNetworkAddress(ipaddress, port)), ssl_(ssl) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ff629c2d/be/src/runtime/client-cache.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/client-cache.cc b/be/src/runtime/client-cache.cc
index 578ef29..4f403ad 100644
--- a/be/src/runtime/client-cache.cc
+++ b/be/src/runtime/client-cache.cc
@@ -110,15 +110,22 @@ Status ClientCacheHelper::CreateClient(const TNetworkAddress& address,
     ClientFactory factory_method, ClientKey* client_key) {
   shared_ptr<ThriftClientImpl> client_impl(factory_method(address, client_key));
   VLOG(2) << "CreateClient(): creating new client for " << client_impl->address();
-  Status status = client_impl->OpenWithRetry(num_tries_, wait_ms_);
-  if (!status.ok()) {
-    *client_key = NULL;
-    return status;
+
+  if (!client_impl->socket_create_status().ok()) {
+    *client_key = nullptr;
+    return client_impl->socket_create_status();
   }
+
   // Set the TSocket's send and receive timeouts.
   client_impl->setRecvTimeout(recv_timeout_ms_);
   client_impl->setSendTimeout(send_timeout_ms_);
 
+  Status status = client_impl->OpenWithRetry(num_tries_, wait_ms_);
+  if (!status.ok()) {
+    *client_key = nullptr;
+    return status;
+  }
+
   // Because the client starts life 'checked out', we don't add it to its host cache.
   {
     lock_guard<mutex> lock(client_map_lock_);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ff629c2d/be/src/statestore/statestore-test.cc
----------------------------------------------------------------------
diff --git a/be/src/statestore/statestore-test.cc b/be/src/statestore/statestore-test.cc
index 2c518a1..8c81383 100644
--- a/be/src/statestore/statestore-test.cc
+++ b/be/src/statestore/statestore-test.cc
@@ -94,8 +94,4 @@ TEST(StatestoreSslTest, SmokeTest) {
 
 }
 
-int main(int argc, char **argv) {
-  InitCommonRuntime(argc, argv, false);
-  ::testing::InitGoogleTest(&argc, argv);
-  return RUN_ALL_TESTS();
-}
+IMPALA_TEST_MAIN();


[6/7] incubator-impala git commit: IMPALA-3125: Fix assignment of equality predicates from an outer-join On-clause.

Posted by sa...@apache.org.
IMPALA-3125: Fix assignment of equality predicates from an outer-join On-clause.

Impala used to incorrectly assign On-clause equality predicates from an
outer join if those predicates referenced multiple tables, but only one
side of the outer join.

The fix is to add an additional check in Analyzer.getEqJoinConjuncts()
to prevent that incorrect assignment.

Change-Id: I719e0eeacccad070b1f9509d80aaf761b572add0
Reviewed-on: http://gerrit.cloudera.org:8080/4986
Reviewed-by: Alex Behm <al...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/12cc5081
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/12cc5081
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/12cc5081

Branch: refs/heads/master
Commit: 12cc5081783e435bbd2e577e8f7666c1ebe7d28a
Parents: 852e272
Author: Alex Behm <al...@cloudera.com>
Authored: Mon Nov 7 17:32:57 2016 -0800
Committer: Internal Jenkins <cl...@gerrit.cloudera.org>
Committed: Mon Dec 5 09:31:25 2016 +0000

----------------------------------------------------------------------
 .../org/apache/impala/analysis/Analyzer.java    | 18 ++++++++++--
 .../queries/PlannerTest/outer-joins.test        | 29 ++++++++++++++++++++
 2 files changed, 44 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/12cc5081/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
index 96ab097..4819342 100644
--- a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
+++ b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
@@ -1316,9 +1316,11 @@ public class Analyzer {
       Expr e = globalState_.conjuncts.get(conjunctId);
       Preconditions.checkState(e != null);
       if (!canEvalFullOuterJoinedConjunct(e, nodeTblRefIds) ||
-          !canEvalAntiJoinedConjunct(e, nodeTblRefIds)) {
+          !canEvalAntiJoinedConjunct(e, nodeTblRefIds) ||
+          !canEvalOuterJoinedConjunct(e, nodeTblRefIds)) {
         continue;
       }
+
       if (ojClauseConjuncts != null && !ojClauseConjuncts.contains(conjunctId)) continue;
       result.add(e);
     }
@@ -1326,8 +1328,8 @@ public class Analyzer {
   }
 
   /**
-   * Checks if a conjunct can be evaluated at a node materializing a list of tuple ids
-   * 'tids'.
+   * Returns false if 'e' references a full outer joined tuple and it is incorrect to
+   * evaluate 'e' at a node materializing 'tids'. Returns true otherwise.
    */
   public boolean canEvalFullOuterJoinedConjunct(Expr e, List<TupleId> tids) {
     TableRef fullOuterJoin = getFullOuterJoinRef(e);
@@ -1336,6 +1338,16 @@ public class Analyzer {
   }
 
   /**
+   * Returns false if 'e' originates from an outer-join On-clause and it is incorrect to
+   * evaluate 'e' at a node materializing 'tids'. Returns true otherwise.
+   */
+  public boolean canEvalOuterJoinedConjunct(Expr e, List<TupleId> tids) {
+    TableRef outerJoin = globalState_.ojClauseByConjunct.get(e.getId());
+    if (outerJoin == null) return true;
+    return tids.containsAll(outerJoin.getAllTableRefIds());
+  }
+
+  /**
    * Returns true if predicate 'e' can be correctly evaluated by a tree materializing
    * 'tupleIds', otherwise false:
    * - the predicate needs to be bound by tupleIds

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/12cc5081/testdata/workloads/functional-planner/queries/PlannerTest/outer-joins.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/outer-joins.test b/testdata/workloads/functional-planner/queries/PlannerTest/outer-joins.test
index 3a39d14..2d5d6cd 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/outer-joins.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/outer-joins.test
@@ -890,3 +890,32 @@ PLAN-ROOT SINK
 04:SCAN HDFS [functional.alltypestiny e]
    partitions=4/4 files=4 size=460B
 ====
+# IMPALA-3125: Test that the On-clause predicates from an outer join are assigned to the
+# corresponding outer-join node, even if the predicates do not reference the join rhs.
+select a.id aid, b.id bid, a.int_col aint, b.int_col bint
+from functional.alltypes a
+inner join functional.alltypes b
+  on a.int_col = b.int_col
+left outer join functional.alltypes c
+  on a.id = b.id and b.bigint_col = c.bigint_col
+---- PLAN
+PLAN-ROOT SINK
+|
+04:HASH JOIN [LEFT OUTER JOIN]
+|  hash predicates: b.bigint_col = c.bigint_col
+|  other join predicates: a.id = b.id
+|
+|--02:SCAN HDFS [functional.alltypes c]
+|     partitions=24/24 files=24 size=478.45KB
+|
+03:HASH JOIN [INNER JOIN]
+|  hash predicates: b.int_col = a.int_col
+|  runtime filters: RF000 <- a.int_col
+|
+|--00:SCAN HDFS [functional.alltypes a]
+|     partitions=24/24 files=24 size=478.45KB
+|
+01:SCAN HDFS [functional.alltypes b]
+   partitions=24/24 files=24 size=478.45KB
+   runtime filters: RF000 -> b.int_col
+====


[4/7] incubator-impala git commit: IMPALA-4579: SHOW CREATE VIEW fails for view containing a subquery

Posted by sa...@apache.org.
IMPALA-4579: SHOW CREATE VIEW fails for view containing a subquery

This commit fixes an issue where a SHOW CREATE VIEW statement throws an
analysis error if the view contains a subquery.

Change-Id: I4a89e46a022f0ccec198b6e3e2b30230103831ce
Reviewed-on: http://gerrit.cloudera.org:8080/5333
Reviewed-by: Dimitris Tsirogiannis <dt...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/1da57019
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/1da57019
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/1da57019

Branch: refs/heads/master
Commit: 1da57019ade33d85f76bfb86fa64332eea7b04eb
Parents: ff629c2
Author: Dimitris Tsirogiannis <dt...@cloudera.com>
Authored: Fri Dec 2 14:53:15 2016 -0800
Committer: Internal Jenkins <cl...@gerrit.cloudera.org>
Committed: Sun Dec 4 08:35:15 2016 +0000

----------------------------------------------------------------------
 .../java/org/apache/impala/analysis/AnalysisContext.java    | 2 +-
 .../queries/QueryTest/show-create-table.test                | 9 +++++++++
 2 files changed, 10 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1da57019/fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java b/fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java
index e20fd34..e720867 100644
--- a/fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java
+++ b/fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java
@@ -335,7 +335,7 @@ public class AnalysisContext {
     public Set<TAccessEvent> getAccessEvents() { return analyzer_.getAccessEvents(); }
     public boolean requiresSubqueryRewrite() {
       return analyzer_.containsSubquery() && !(stmt_ instanceof CreateViewStmt)
-          && !(stmt_ instanceof AlterViewStmt);
+          && !(stmt_ instanceof AlterViewStmt) && !(stmt_ instanceof ShowCreateTableStmt);
     }
     public boolean requiresExprRewrite() {
       return isQueryStmt() ||isInsertStmt() || isCreateTableAsSelectStmt()

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1da57019/testdata/workloads/functional-query/queries/QueryTest/show-create-table.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/show-create-table.test b/testdata/workloads/functional-query/queries/QueryTest/show-create-table.test
index 910f2d5..252cd96 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/show-create-table.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/show-create-table.test
@@ -337,3 +337,12 @@ STORED AS PARQUET
 LOCATION '$$location_uri$$'
 TBLPROPERTIES ()
 ====
+---- CREATE_VIEW
+# Create view that contains a subquery (IMPALA-4579)
+CREATE VIEW view_with_subquery AS SELECT * FROM functional.alltypestiny
+  WHERE id IN (SELECT id FROM functional.alltypesagg);
+---- RESULTS
+CREATE VIEW show_create_table_test_db.view_with_subquery
+AS SELECT * FROM functional.alltypestiny
+WHERE id IN (SELECT id FROM functional.alltypesagg)
+====


[7/7] incubator-impala git commit: Fix typo in DDL statement for loading Kudu table in stress test

Posted by sa...@apache.org.
Fix typo in DDL statement for loading Kudu table in stress test

Change-Id: I198babd2b52bd64c7f0d27c1be945d2fe7f9f55c
Reviewed-on: http://gerrit.cloudera.org:8080/5158
Tested-by: Impala Public Jenkins
Reviewed-by: Dimitris Tsirogiannis <dt...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/b8b64e11
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/b8b64e11
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/b8b64e11

Branch: refs/heads/master
Commit: b8b64e11081500563eca5ba548de814f6974bad3
Parents: 12cc508
Author: Dimitris Tsirogiannis <dt...@cloudera.com>
Authored: Sun Nov 20 16:59:33 2016 -0800
Committer: Dimitris Tsirogiannis <dt...@cloudera.com>
Committed: Mon Dec 5 18:15:01 2016 +0000

----------------------------------------------------------------------
 testdata/datasets/tpcds/tpcds_kudu_template.sql | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b8b64e11/testdata/datasets/tpcds/tpcds_kudu_template.sql
----------------------------------------------------------------------
diff --git a/testdata/datasets/tpcds/tpcds_kudu_template.sql b/testdata/datasets/tpcds/tpcds_kudu_template.sql
index a147f12..6c0b994 100644
--- a/testdata/datasets/tpcds/tpcds_kudu_template.sql
+++ b/testdata/datasets/tpcds/tpcds_kudu_template.sql
@@ -115,7 +115,7 @@ CREATE TABLE IF NOT EXISTS {target_db_name}.web_sales (
   ws_net_paid_inc_ship DOUBLE,
   ws_net_paid_inc_ship_tax DOUBLE,
   ws_net_profit DOUBLE,
-  PRIMARY KEY (wd_order_number, ws_item_sk)
+  PRIMARY KEY (ws_order_number, ws_item_sk)
 )
 DISTRIBUTE BY HASH (ws_order_number,ws_item_sk) INTO {buckets} BUCKETS
 STORED AS KUDU