You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by we...@apache.org on 2017/05/05 17:31:50 UTC

[03/51] [partial] hive git commit: HIVE-14671 : merge master into hive-14535 (Wei Zheng)

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/log/LogDivertAppenderForTest.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/log/LogDivertAppenderForTest.java b/ql/src/java/org/apache/hadoop/hive/ql/log/LogDivertAppenderForTest.java
new file mode 100644
index 0000000..966c264
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/log/LogDivertAppenderForTest.java
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.log;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.LoggerContext;
+import org.apache.logging.log4j.core.appender.RandomAccessFileAppender;
+import org.apache.logging.log4j.core.appender.routing.Route;
+import org.apache.logging.log4j.core.appender.routing.Routes;
+import org.apache.logging.log4j.core.appender.routing.RoutingAppender;
+import org.apache.logging.log4j.core.config.Configuration;
+import org.apache.logging.log4j.core.config.LoggerConfig;
+import org.apache.logging.log4j.core.config.Node;
+import org.apache.logging.log4j.core.config.plugins.Plugin;
+import org.apache.logging.log4j.core.config.plugins.PluginFactory;
+import org.apache.logging.log4j.core.config.plugins.processor.PluginEntry;
+import org.apache.logging.log4j.core.config.plugins.util.PluginType;
+import org.apache.logging.log4j.core.filter.AbstractFilter;
+import org.apache.logging.log4j.core.layout.PatternLayout;
+
+/**
+ * Divert appender to redirect and filter test operation logs to match the output of the original
+ * CLI qtest results.
+ */
+public final class LogDivertAppenderForTest {
+  private LogDivertAppenderForTest() {
+    // Prevent instantiation
+  }
+
+  /**
+   * A log filter that filters test messages coming from the logger.
+   */
+  @Plugin(name = "TestFilter", category = "Core", elementType="filter", printObject = true)
+  private static class TestFilter extends AbstractFilter {
+    @Override
+    public Result filter(LogEvent event) {
+      if (event.getLevel().equals(Level.INFO) && "SessionState".equals(event.getLoggerName())) {
+        if (event.getMessage().getFormattedMessage().startsWith("PREHOOK:")
+            || event.getMessage().getFormattedMessage().startsWith("POSTHOOK:")) {
+          return Result.ACCEPT;
+        }
+      }
+      return Result.DENY;
+    }
+
+    @PluginFactory
+    public static TestFilter createFilter() {
+      return new TestFilter();
+    }
+  }
+
+  /**
+   * If the HIVE_IN_TEST is set, then programmatically register a routing appender to Log4J
+   * configuration, which automatically writes the test log of each query to an individual file.
+   * The equivalent property configuration is as follows:
+   *  # queryId based routing file appender
+      appender.test-query-routing.type = Routing
+      appender.test-query-routing.name = test-query-routing
+      appender.test-query-routing.routes.type = Routes
+      appender.test-query-routing.routes.pattern = $${ctx:queryId}
+      # default route
+      appender.test-query-routing.routes.test-route-default.type = Route
+      appender.test-query-routing.routes.test-route-default.key = $${ctx:queryId}
+      appender.test-query-routing.routes.test-route-default.app.type = NullAppender
+      appender.test-query-routing.routes.test-route-default.app.name = test-null-appender
+      # queryId based route
+      appender.test-query-routing.routes.test-route-mdc.type = Route
+      appender.test-query-routing.routes.test-route-mdc.name = test-query-routing
+      appender.test-query-routing.routes.test-route-mdc.app.type = RandomAccessFile
+      appender.test-query-routing.routes.test-route-mdc.app.name = test-query-file-appender
+      appender.test-query-routing.routes.test-route-mdc.app.fileName = ${sys:hive.log.dir}/${ctx:sessionId}/${ctx:queryId}.test
+      appender.test-query-routing.routes.test-route-mdc.app.layout.type = PatternLayout
+      appender.test-query-routing.routes.test-route-mdc.app.layout.pattern = %d{ISO8601} %5p %c{2}: %m%n
+      appender.test-query-routing.routes.test-route-mdc.app.filter.type = TestFilter
+   * @param conf the configuration for HiveServer2 instance
+   */
+  public static void registerRoutingAppenderIfInTest(org.apache.hadoop.conf.Configuration conf) {
+    if (!conf.getBoolean(HiveConf.ConfVars.HIVE_IN_TEST.varname,
+        HiveConf.ConfVars.HIVE_IN_TEST.defaultBoolVal)) {
+      // If not in test mode, then do no create the appender
+      return;
+    }
+
+    String logLocation =
+        HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_SERVER2_LOGGING_OPERATION_LOG_LOCATION);
+
+    // Create test-null-appender to drop events without queryId
+    PluginEntry nullAppenderEntry = new PluginEntry();
+    nullAppenderEntry.setClassName(NullAppender.class.getName());
+    PluginType<NullAppender> nullAppenderType =
+        new PluginType<NullAppender>(nullAppenderEntry, NullAppender.class, "appender");
+    Node nullAppenderChildNode = new Node(null, "test-null-appender", nullAppenderType);
+
+    // Create default route where events go without queryId
+    PluginEntry defaultRouteEntry = new PluginEntry();
+    defaultRouteEntry.setClassName(Route.class.getName());
+    PluginType<Route> defaultRouteType = new PluginType<Route>(defaultRouteEntry, Route.class, "");
+    Node defaultRouteNode = new Node(null, "test-route-default", defaultRouteType);
+    // Add the test-null-appender to the default route
+    defaultRouteNode.getChildren().add(nullAppenderChildNode);
+
+    // Create queryId based route
+    PluginEntry queryIdRouteEntry = new PluginEntry();
+    queryIdRouteEntry.setClassName(Route.class.getName());
+    PluginType<Route> queryIdRouteType = new PluginType<Route>(queryIdRouteEntry, Route.class, "");
+    Node queryIdRouteNode = new Node(null, "test-route-mdc", queryIdRouteType);
+
+    // Create the queryId appender for the queryId route
+    PluginEntry queryIdAppenderEntry = new PluginEntry();
+    queryIdAppenderEntry.setClassName(RandomAccessFileAppender.class.getName());
+    PluginType<RandomAccessFileAppender> queryIdAppenderType =
+        new PluginType<RandomAccessFileAppender>(queryIdAppenderEntry,
+            RandomAccessFileAppender.class, "appender");
+    Node queryIdAppenderNode =
+        new Node(queryIdRouteNode, "test-query-file-appender", queryIdAppenderType);
+    queryIdAppenderNode.getAttributes().put("fileName", logLocation
+        + "/${ctx:sessionId}/${ctx:queryId}.test");
+    queryIdAppenderNode.getAttributes().put("name", "test-query-file-appender");
+    // Add the queryId appender to the queryId based route
+    queryIdRouteNode.getChildren().add(queryIdAppenderNode);
+
+    // Create the filter for the queryId appender
+    PluginEntry filterEntry = new PluginEntry();
+    filterEntry.setClassName(TestFilter.class.getName());
+    PluginType<TestFilter> filterType =
+        new PluginType<TestFilter>(filterEntry, TestFilter.class, "");
+    Node filterNode = new Node(queryIdAppenderNode, "test-filter", filterType);
+    // Add the filter to the queryId appender
+    queryIdAppenderNode.getChildren().add(filterNode);
+
+    // Create the layout for the queryId appender
+    PluginEntry layoutEntry = new PluginEntry();
+    layoutEntry.setClassName(PatternLayout.class.getName());
+    PluginType<PatternLayout> layoutType =
+        new PluginType<PatternLayout>(layoutEntry, PatternLayout.class, "");
+    Node layoutNode = new Node(queryIdAppenderNode, "PatternLayout", layoutType);
+    layoutNode.getAttributes().put("pattern", LogDivertAppender.nonVerboseLayout);
+    // Add the layout to the queryId appender
+    queryIdAppenderNode.getChildren().add(layoutNode);
+
+    // Create the route objects based on the Nodes
+    Route defaultRoute = Route.createRoute(null, "${ctx:queryId}", defaultRouteNode);
+    Route mdcRoute = Route.createRoute(null, null, queryIdRouteNode);
+    // Create the routes group
+    Routes routes = Routes.createRoutes("${ctx:queryId}", defaultRoute, mdcRoute);
+
+    LoggerContext context = (LoggerContext)LogManager.getContext(false);
+    Configuration configuration = context.getConfiguration();
+
+    // Create the appender
+    RoutingAppender routingAppender = RoutingAppender.createAppender("test-query-routing",
+        "true",
+        routes,
+        configuration,
+        null,
+        null,
+        null);
+
+    LoggerConfig loggerConfig = configuration.getRootLogger();
+    loggerConfig.addAppender(routingAppender, null, null);
+    context.updateLoggers();
+    routingAppender.start();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index ea87cb4..de68876 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -697,6 +697,11 @@ public class Hive {
       throws InvalidOperationException, HiveException {
     try {
       validatePartition(newPart);
+      String location = newPart.getLocation();
+      if (location != null && !Utilities.isDefaultNameNode(conf)) {
+        location = Utilities.getQualifiedPath(conf, new Path(location));
+        newPart.setLocation(location);
+      }
       getMSC().alter_partition(dbName, tblName, newPart.getTPartition(), environmentContext);
 
     } catch (MetaException e) {
@@ -736,6 +741,11 @@ public class Hive {
         if (tmpPart.getParameters() != null) {
           tmpPart.getParameters().remove(hive_metastoreConstants.DDL_TIME);
         }
+        String location = tmpPart.getLocation();
+        if (location != null && !Utilities.isDefaultNameNode(conf)) {
+          location = Utilities.getQualifiedPath(conf, new Path(location));
+          tmpPart.setLocation(location);
+        }
         newTParts.add(tmpPart.getTPartition());
       }
       getMSC().alter_partitions(names[0], names[1], newTParts, environmentContext);
@@ -1208,6 +1218,27 @@ public class Hive {
     }
   }
 
+
+
+  /**
+   * Truncates the table/partition as per specifications. Just trash the data files
+   *
+   * @param dbDotTableName
+   *          name of the table
+   * @throws HiveException
+   */
+  public void truncateTable(String dbDotTableName, Map<String, String> partSpec) throws HiveException {
+    try {
+      Table table = getTable(dbDotTableName, true);
+
+      List<String> partNames = ((null == partSpec)
+                       ? null : getPartitionNames(table.getDbName(), table.getTableName(), partSpec, (short) -1));
+      getMSC().truncateTable(table.getDbName(), table.getTableName(), partNames);
+    } catch (Exception e) {
+      throw new HiveException(e);
+    }
+  }
+
   public HiveConf getConf() {
     return (conf);
   }
@@ -1413,7 +1444,6 @@ public class Hive {
    */
   public List<String> getTablesByType(String dbName, String pattern, TableType type)
       throws HiveException {
-    List<String> retList = new ArrayList<String>();
     if (dbName == null)
       dbName = SessionState.get().getCurrentDatabase();
 
@@ -1578,7 +1608,20 @@ public class Hive {
     return getDatabase(currentDb);
   }
 
-  public void loadSinglePartition(Path loadPath, String tableName,
+  /**
+   * @param loadPath
+   * @param tableName
+   * @param partSpec
+   * @param replace
+   * @param inheritTableSpecs
+   * @param isSkewedStoreAsSubdir
+   * @param isSrcLocal
+   * @param isAcid
+   * @param hasFollowingStatsTask
+   * @return
+   * @throws HiveException
+   */
+  public void loadPartition(Path loadPath, String tableName,
       Map<String, String> partSpec, boolean replace, boolean inheritTableSpecs,
       boolean isSkewedStoreAsSubdir,  boolean isSrcLocal, boolean isAcid,
       boolean hasFollowingStatsTask, Long mmWriteId, boolean isCommitMmWrite)
@@ -1594,7 +1637,6 @@ public class Hive {
     }
   }
 
-
   public void commitMmTableWrite(Table tbl, Long mmWriteId)
       throws HiveException {
     try {
@@ -1623,7 +1665,11 @@ public class Hive {
    *          location/inputformat/outputformat/serde details from table spec
    * @param isSrcLocal
    *          If the source directory is LOCAL
-   * @param isAcid true if this is an ACID operation
+   * @param isAcid
+   *          true if this is an ACID operation
+   * @param hasFollowingStatsTask
+   *          true if there is a following task which updates the stats, so, this method need not update.
+   * @return Partition object being loaded with data
    */
   public Partition loadPartition(Path loadPath, Table tbl, Map<String, String> partSpec,
       boolean replace, boolean inheritTableSpecs, boolean isSkewedStoreAsSubdir,
@@ -1631,6 +1677,7 @@ public class Hive {
           throws HiveException {
     Path tblDataLocationPath =  tbl.getDataLocation();
     try {
+      // Get the partition object if it already exists
       Partition oldPart = getPartition(tbl, partSpec, false);
       /**
        * Move files before creating the partition since down stream processes
@@ -1668,6 +1715,12 @@ public class Hive {
       List<Path> newFiles = null;
       PerfLogger perfLogger = SessionState.getPerfLogger();
       perfLogger.PerfLogBegin("MoveTask", "FileMoves");
+      // If config is set, table is not temporary and partition being inserted exists, capture
+      // the list of files added. For not yet existing partitions (insert overwrite to new partition
+      // or dynamic partition inserts), the add partition event will capture the list of files added.
+      if (conf.getBoolVar(ConfVars.FIRE_EVENTS_FOR_DML) && !tbl.isTemporary() && (null != oldPart)) {
+        newFiles = Collections.synchronizedList(new ArrayList<Path>());
+      }
       // TODO: this assumes both paths are qualified; which they are, currently.
       if (mmWriteId != null && loadPath.equals(newPartPath)) {
         // MM insert query, move itself is a no-op.
@@ -1678,7 +1731,8 @@ public class Hive {
         }
         Utilities.LOG14535.info("maybe deleting stuff from " + oldPartPath + " (new " + newPartPath + ") for replace");
         if (replace && oldPartPath != null) {
-          deleteOldPathForReplace(newPartPath, oldPartPath, getConf(),
+          boolean isAutoPurge = "true".equalsIgnoreCase(tbl.getProperty("auto.purge"));
+          deleteOldPathForReplace(newPartPath, oldPartPath, getConf(), isAutoPurge,
               new ValidWriteIds.IdPathFilter(mmWriteId, false, true), mmWriteId != null,
               tbl.isStoredAsSubDirectories() ? tbl.getSkewedColNames().size() : 0);
         }
@@ -1693,13 +1747,10 @@ public class Hive {
         }
         Utilities.LOG14535.info("moving " + loadPath + " to " + destPath);
         if (replace || (oldPart == null && !isAcid)) {
+          boolean isAutoPurge = "true".equalsIgnoreCase(tbl.getProperty("auto.purge"));
           replaceFiles(tbl.getPath(), loadPath, destPath, oldPartPath, getConf(),
-              isSrcLocal, filter, mmWriteId != null);
+              isSrcLocal, isAutoPurge, newFiles, filter, mmWriteId != null);
         } else {
-          if (areEventsForDmlNeeded(tbl, oldPart)) {
-            newFiles = Collections.synchronizedList(new ArrayList<Path>());
-          }
-
           FileSystem fs = tbl.getDataLocation().getFileSystem(conf);
           Hive.copyFiles(conf, loadPath, destPath, fs, isSrcLocal, isAcid, newFiles);
         }
@@ -1708,13 +1759,17 @@ public class Hive {
       Partition newTPart = oldPart != null ? oldPart : new Partition(tbl, partSpec, newPartPath);
       alterPartitionSpecInMemory(tbl, partSpec, newTPart.getTPartition(), inheritTableSpecs, newPartPath.toString());
       validatePartition(newTPart);
-      if ((null != newFiles) || replace) {
-        fireInsertEvent(tbl, partSpec, newFiles);
+
+      // Generate an insert event only if inserting into an existing partition
+      // When inserting into a new partition, the add partition event takes care of insert event
+      if ((null != oldPart) && (null != newFiles)) {
+        fireInsertEvent(tbl, partSpec, replace, newFiles);
       } else {
-        LOG.debug("No new files were created, and is not a replace. Skipping generating INSERT event.");
+        LOG.debug("No new files were created, and is not a replace, or we're inserting into a "
+                + "partition that does not exist yet. Skipping generating INSERT event.");
       }
 
-      //column stats will be inaccurate
+      // column stats will be inaccurate
       StatsSetupConst.clearColumnStatsState(newTPart.getParameters());
 
       // recreate the partition if it existed before
@@ -2158,7 +2213,8 @@ private void constructOneLBLocationMap(FileStatus fSta,
       Utilities.LOG14535.info("not moving " + loadPath + " to " + tbl.getPath());
       if (replace) {
         Path tableDest = tbl.getPath();
-        deleteOldPathForReplace(tableDest, tableDest, sessionConf,
+        boolean isAutopurge = "true".equalsIgnoreCase(tbl.getProperty("auto.purge"));
+        deleteOldPathForReplace(tableDest, tableDest, sessionConf, isAutopurge,
             new ValidWriteIds.IdPathFilter(mmWriteId, false, true), mmWriteId != null,
             tbl.isStoredAsSubDirectories() ? tbl.getSkewedColNames().size() : 0);
       }
@@ -2174,8 +2230,9 @@ private void constructOneLBLocationMap(FileStatus fSta,
       }
       Utilities.LOG14535.info("moving " + loadPath + " to " + tblPath + " (replace = " + replace + ")");
       if (replace) {
+        boolean isAutopurge = "true".equalsIgnoreCase(tbl.getProperty("auto.purge"));
         replaceFiles(tblPath, loadPath, destPath, tblPath,
-            sessionConf, isSrcLocal, filter, mmWriteId != null);
+            sessionConf, isSrcLocal, isAutopurge, newFiles, filter, mmWriteId != null);
       } else {
         try {
           FileSystem fs = tbl.getDataLocation().getFileSystem(sessionConf);
@@ -2221,7 +2278,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
       commitMmTableWrite(tbl, mmWriteId);
     }
 
-    fireInsertEvent(tbl, null, newFiles);
+    fireInsertEvent(tbl, null, replace, newFiles);
   }
 
   /**
@@ -2446,7 +2503,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
         }
         else {
           alterPartitionSpec(tbl, partSpec, tpart, inheritTableSpecs, partPath);
-          fireInsertEvent(tbl, partSpec, newFiles);
+          fireInsertEvent(tbl, partSpec, true, newFiles);
         }
       }
       if (tpart == null) {
@@ -2496,7 +2553,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
     tpart.getSd().setLocation(partPath);
   }
 
-  private void fireInsertEvent(Table tbl, Map<String, String> partitionSpec, List<Path> newFiles)
+  private void fireInsertEvent(Table tbl, Map<String, String> partitionSpec, boolean replace, List<Path> newFiles)
       throws HiveException {
     if (conf.getBoolVar(ConfVars.FIRE_EVENTS_FOR_DML)) {
       LOG.debug("Firing dml insert event");
@@ -2508,6 +2565,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
         FileSystem fileSystem = tbl.getDataLocation().getFileSystem(conf);
         FireEventRequestData data = new FireEventRequestData();
         InsertEventRequestData insertData = new InsertEventRequestData();
+        insertData.setReplace(replace);
         data.setInsertData(insertData);
         if (newFiles != null && newFiles.size() > 0) {
           for (Path p : newFiles) {
@@ -3053,8 +3111,6 @@ private void constructOneLBLocationMap(FileStatus fSta,
     if (!fullDestStatus.getFileStatus().isDirectory()) {
       throw new HiveException(destf + " is not a directory.");
     }
-    final boolean inheritPerms = HiveConf.getBoolVar(conf,
-        HiveConf.ConfVars.HIVE_WAREHOUSE_SUBDIR_INHERIT_PERMS);
     final List<Future<ObjectPair<Path, Path>>> futures = new LinkedList<>();
     final ExecutorService pool = conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25) > 0 ?
         Executors.newFixedThreadPool(conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25),
@@ -3081,15 +3137,10 @@ private void constructOneLBLocationMap(FileStatus fSta,
         // If we do a rename for a non-local file, we will be transfering the original
         // file permissions from source to the destination. Else, in case of mvFile() where we
         // copy from source to destination, we will inherit the destination's parent group ownership.
-        final String srcGroup = isRenameAllowed ? srcFile.getGroup() :
-          fullDestStatus.getFileStatus().getGroup();
         if (null == pool) {
           try {
             Path destPath = mvFile(conf, srcFs, srcP, destFs, destf, isSrcLocal, isRenameAllowed);
 
-            if (inheritPerms) {
-              HdfsUtils.setFullFileStatus(conf, fullDestStatus, srcGroup, destFs, destPath, false);
-            }
             if (null != newFiles) {
               newFiles.add(destPath);
             }
@@ -3104,9 +3155,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
               SessionState.setCurrentSessionState(parentSession);
 
               Path destPath = mvFile(conf, srcFs, srcP, destFs, destf, isSrcLocal, isRenameAllowed);
-              if (inheritPerms) {
-                HdfsUtils.setFullFileStatus(conf, fullDestStatus, srcGroup, destFs, destPath, false);
-              }
+
               if (null != newFiles) {
                 newFiles.add(destPath);
               }
@@ -3116,11 +3165,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
         }
       }
     }
-    if (null == pool) {
-      if (inheritPerms) {
-        HdfsUtils.setFullFileStatus(conf, fullDestStatus, null, destFs, destf, true);
-      }
-    } else {
+    if (null != pool) {
       pool.shutdown();
       for (Future<ObjectPair<Path, Path>> future : futures) {
         try {
@@ -3178,11 +3223,34 @@ private void constructOneLBLocationMap(FileStatus fSta,
     return ShimLoader.getHadoopShims().getPathWithoutSchemeAndAuthority(path);
   }
 
+  /**
+   * <p>
+   *   Moves a file from one {@link Path} to another. If {@code isRenameAllowed} is true then the
+   *   {@link FileSystem#rename(Path, Path)} method is used to move the file. If its false then the data is copied, if
+   *   {@code isSrcLocal} is true then the {@link FileSystem#copyFromLocalFile(Path, Path)} method is used, else
+   *   {@link FileUtils#copy(FileSystem, Path, FileSystem, Path, boolean, boolean, HiveConf)} is used.
+   * </p>
+   *
+   * <p>
+   *   If the destination file already exists, then {@code _copy_[counter]} is appended to the file name, where counter
+   *   is an integer starting from 1.
+   * </p>
+   *
+   * @param conf the {@link HiveConf} to use if copying data
+   * @param sourceFs the {@link FileSystem} where the source file exists
+   * @param sourcePath the {@link Path} to move
+   * @param destFs the {@link FileSystem} to move the file to
+   * @param destDirPath the {@link Path} to move the file to
+   * @param isSrcLocal if the source file is on the local filesystem
+   * @param isRenameAllowed true if the data should be renamed and not copied, false otherwise
+   *
+   * @return the {@link Path} the source file was moved to
+   *
+   * @throws IOException if there was an issue moving the file
+   */
   private static Path mvFile(HiveConf conf, FileSystem sourceFs, Path sourcePath, FileSystem destFs, Path destDirPath,
                              boolean isSrcLocal, boolean isRenameAllowed) throws IOException {
 
-    boolean isBlobStoragePath = BlobStorageUtils.isBlobStoragePath(conf, destDirPath);
-
     // Strip off the file type, if any so we don't make:
     // 000000_0.gz -> 000000_0.gz_copy_1
     final String fullname = sourcePath.getName();
@@ -3192,27 +3260,19 @@ private void constructOneLBLocationMap(FileStatus fSta,
     Path destFilePath = new Path(destDirPath, fullname);
 
     /*
-       * The below loop may perform bad when the destination file already exists and it has too many _copy_
-       * files as well. A desired approach was to call listFiles() and get a complete list of files from
-       * the destination, and check whether the file exists or not on that list. However, millions of files
-       * could live on the destination directory, and on concurrent situations, this can cause OOM problems.
-       *
-       * I'll leave the below loop for now until a better approach is found.
-       */
-
-    int counter = 1;
-    if (!isRenameAllowed || isBlobStoragePath) {
-      while (destFs.exists(destFilePath)) {
-        destFilePath =  new Path(destDirPath, name + ("_copy_" + counter) + (!type.isEmpty() ? "." + type : ""));
-        counter++;
-      }
+    * The below loop may perform bad when the destination file already exists and it has too many _copy_
+    * files as well. A desired approach was to call listFiles() and get a complete list of files from
+    * the destination, and check whether the file exists or not on that list. However, millions of files
+    * could live on the destination directory, and on concurrent situations, this can cause OOM problems.
+    *
+    * I'll leave the below loop for now until a better approach is found.
+    */
+    for (int counter = 1; destFs.exists(destFilePath); counter++) {
+      destFilePath =  new Path(destDirPath, name + ("_copy_" + counter) + (!type.isEmpty() ? "." + type : ""));
     }
 
     if (isRenameAllowed) {
-      while (!destFs.rename(sourcePath, destFilePath)) {
-        destFilePath =  new Path(destDirPath, name + ("_copy_" + counter) + (!type.isEmpty() ? "." + type : ""));
-        counter++;
-      }
+      destFs.rename(sourcePath, destFilePath);
     } else if (isSrcLocal) {
       destFs.copyFromLocalFile(sourcePath, destFilePath);
     } else {
@@ -3221,7 +3281,6 @@ private void constructOneLBLocationMap(FileStatus fSta,
           false,  // overwrite destination
           conf);
     }
-
     return destFilePath;
   }
 
@@ -3250,12 +3309,30 @@ private void constructOneLBLocationMap(FileStatus fSta,
     }
   }
 
+  // List the new files in destination path which gets copied from source.
+  public static void listNewFilesRecursively(final FileSystem destFs, Path dest,
+                                             List<Path> newFiles) throws HiveException {
+    try {
+      for (FileStatus fileStatus : destFs.listStatus(dest, FileUtils.HIDDEN_FILES_PATH_FILTER)) {
+        if (fileStatus.isDirectory()) {
+          // If it is a sub-directory, then recursively list the files.
+          listNewFilesRecursively(destFs, fileStatus.getPath(), newFiles);
+        } else {
+          newFiles.add(fileStatus.getPath());
+        }
+      }
+    } catch (IOException e) {
+      LOG.error("Failed to get source file statuses", e);
+      throw new HiveException(e.getMessage(), e);
+    }
+  }
+
   //it is assumed that parent directory of the destf should already exist when this
   //method is called. when the replace value is true, this method works a little different
   //from mv command if the destf is a directory, it replaces the destf instead of moving under
   //the destf. in this case, the replaced destf still preserves the original destf's permission
-  public static boolean moveFile(final HiveConf conf, Path srcf, final Path destf,
-      boolean replace, boolean isSrcLocal) throws HiveException {
+  public static boolean moveFile(final HiveConf conf, Path srcf, final Path destf, boolean replace,
+                                 boolean isSrcLocal) throws HiveException {
     final FileSystem srcFs, destFs;
     try {
       destFs = destf.getFileSystem(conf);
@@ -3266,13 +3343,10 @@ private void constructOneLBLocationMap(FileStatus fSta,
     try {
       srcFs = srcf.getFileSystem(conf);
     } catch (IOException e) {
-      LOG.error("Failed to get dest fs", e);
+      LOG.error("Failed to get src fs", e);
       throw new HiveException(e.getMessage(), e);
     }
 
-    //needed for perm inheritance.
-    final boolean inheritPerms = HiveConf.getBoolVar(conf,
-        HiveConf.ConfVars.HIVE_WAREHOUSE_SUBDIR_INHERIT_PERMS);
     HdfsUtils.HadoopFileStatus destStatus = null;
 
     // If source path is a subdirectory of the destination path (or the other way around):
@@ -3284,7 +3358,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
     boolean srcIsSubDirOfDest = isSubDir(srcf, destf, srcFs, destFs, isSrcLocal),
         destIsSubDirOfSrc = isSubDir(destf, srcf, destFs, srcFs, false);
     try {
-      if (inheritPerms || replace) {
+      if (replace) {
         try{
           destStatus = new HdfsUtils.HadoopFileStatus(conf, destFs, destf);
           //if destf is an existing directory:
@@ -3297,10 +3371,6 @@ private void constructOneLBLocationMap(FileStatus fSta,
             LOG.debug("The path " + destf.toString() + " is deleted");
           }
         } catch (FileNotFoundException ignore) {
-          //if dest dir does not exist, any re
-          if (inheritPerms) {
-            destStatus = new HdfsUtils.HadoopFileStatus(conf, destFs, destf.getParent());
-          }
         }
       }
       final HdfsUtils.HadoopFileStatus desiredStatus = destStatus;
@@ -3308,9 +3378,6 @@ private void constructOneLBLocationMap(FileStatus fSta,
       if (isSrcLocal) {
         // For local src file, copy to hdfs
         destFs.copyFromLocalFile(srcf, destf);
-        if (inheritPerms) {
-          HdfsUtils.setFullFileStatus(conf, destStatus, destFs, destf, true);
-        }
         return true;
       } else {
         if (needToCopy(srcf, destf, srcFs, destFs)) {
@@ -3347,11 +3414,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
                   public Void call() throws Exception {
                     SessionState.setCurrentSessionState(parentSession);
                     final String group = srcStatus.getGroup();
-                    if(destFs.rename(srcStatus.getPath(), destFile)) {
-                      if (inheritPerms) {
-                        HdfsUtils.setFullFileStatus(conf, desiredStatus, group, destFs, destFile, false);
-                      }
-                    } else {
+                    if(!destFs.rename(srcStatus.getPath(), destFile)) {
                       throw new IOException("rename for src path: " + srcStatus.getPath() + " to dest path:"
                           + destFile + " returned false");
                     }
@@ -3360,11 +3423,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
                 }));
               }
             }
-            if (null == pool) {
-              if (inheritPerms) {
-                HdfsUtils.setFullFileStatus(conf, desiredStatus, null, destFs, destf, true);
-              }
-            } else {
+            if (null != pool) {
               pool.shutdown();
               for (Future<Void> future : futures) {
                 try {
@@ -3379,9 +3438,6 @@ private void constructOneLBLocationMap(FileStatus fSta,
             return true;
           } else {
             if (destFs.rename(srcf, destf)) {
-              if (inheritPerms) {
-                HdfsUtils.setFullFileStatus(conf, destStatus, destFs, destf, true);
-              }
               return true;
             }
             return false;
@@ -3432,12 +3488,10 @@ private void constructOneLBLocationMap(FileStatus fSta,
    */
   static protected void copyFiles(HiveConf conf, Path srcf, Path destf,
       FileSystem fs, boolean isSrcLocal, boolean isAcid, List<Path> newFiles) throws HiveException {
-    boolean inheritPerms = HiveConf.getBoolVar(conf,
-        HiveConf.ConfVars.HIVE_WAREHOUSE_SUBDIR_INHERIT_PERMS);
     try {
       // create the destination if it does not exist
       if (!fs.exists(destf)) {
-        FileUtils.mkdir(fs, destf, inheritPerms, conf);
+        FileUtils.mkdir(fs, destf, conf);
       }
     } catch (IOException e) {
       throw new HiveException(
@@ -3571,11 +3625,16 @@ private void constructOneLBLocationMap(FileStatus fSta,
    * @param oldPath
    *          The directory where the old data location, need to be cleaned up.  Most of time, will be the same
    *          as destf, unless its across FileSystem boundaries.
+   * @param purge
+   *          When set to true files which needs to be deleted are not moved to Trash
    * @param isSrcLocal
    *          If the source directory is LOCAL
+   * @param newFiles
+   *          Output the list of new files replaced in the destination path
    */
   protected void replaceFiles(Path tablePath, Path srcf, Path destf, Path oldPath, HiveConf conf,
-          boolean isSrcLocal, PathFilter deletePathFilter, boolean isMmTable) throws HiveException {
+          boolean isSrcLocal, boolean purge, List<Path> newFiles, PathFilter deletePathFilter,
+          boolean isMmTable) throws HiveException {
     try {
 
       FileSystem destFs = destf.getFileSystem(conf);
@@ -3596,14 +3655,12 @@ private void constructOneLBLocationMap(FileStatus fSta,
       if (oldPath != null) {
         // TODO: we assume lbLevels is 0 here. Same as old code for non-MM.
         //       For MM tables, this can only be a LOAD command. Does LOAD even support LB?
-        deleteOldPathForReplace(destf, oldPath, conf, deletePathFilter, isMmTable, 0);
+        deleteOldPathForReplace(destf, oldPath, conf, purge, deletePathFilter, isMmTable, 0);
       }
 
       // first call FileUtils.mkdir to make sure that destf directory exists, if not, it creates
-      // destf with inherited permissions
-      boolean inheritPerms = HiveConf.getBoolVar(conf, HiveConf.ConfVars
-          .HIVE_WAREHOUSE_SUBDIR_INHERIT_PERMS);
-      boolean destfExist = FileUtils.mkdir(destFs, destf, inheritPerms, conf);
+      // destf
+      boolean destfExist = FileUtils.mkdir(destFs, destf, conf);
       if(!destfExist) {
         throw new IOException("Directory " + destf.toString()
             + " does not exist and could not be created.");
@@ -3619,11 +3676,23 @@ private void constructOneLBLocationMap(FileStatus fSta,
         if (!moveFile(conf, srcs[0].getPath(), destf, true, isSrcLocal)) {
           throw new IOException("Error moving: " + srcf + " into: " + destf);
         }
-      } else { // its either a file or glob
+
+        // Add file paths of the files that will be moved to the destination if the caller needs it
+        if (null != newFiles) {
+          listNewFilesRecursively(destFs, destf, newFiles);
+        }
+      } else {
+        // its either a file or glob
         for (FileStatus src : srcs) {
-          if (!moveFile(conf, src.getPath(), new Path(destf, src.getPath().getName()), true, isSrcLocal)) {
+          Path destFile = new Path(destf, src.getPath().getName());
+          if (!moveFile(conf, src.getPath(), destFile, true, isSrcLocal)) {
             throw new IOException("Error moving: " + srcf + " into: " + destf);
           }
+
+          // Add file paths of the files that will be moved to the destination if the caller needs it
+          if (null != newFiles) {
+            newFiles.add(destFile);
+          }
         }
       }
     } catch (IOException e) {
@@ -3631,7 +3700,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
     }
   }
 
-  private void deleteOldPathForReplace(Path destPath, Path oldPath, HiveConf conf,
+  private void deleteOldPathForReplace(Path destPath, Path oldPath, HiveConf conf, boolean purge,
       PathFilter pathFilter, boolean isMmTable, int lbLevels) throws HiveException {
     Utilities.LOG14535.info("Deleting old paths for replace in " + destPath + " and old path " + oldPath);
     boolean isOldPathUnderDestf = false;
@@ -3645,7 +3714,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
       isOldPathUnderDestf = isSubDir(oldPath, destPath, oldFs, destFs, false);
       if (isOldPathUnderDestf || isMmTable) {
         if (lbLevels == 0 || !isMmTable) {
-          cleanUpOneDirectoryForReplace(oldPath, oldFs, pathFilter, conf);
+          cleanUpOneDirectoryForReplace(oldPath, oldFs, pathFilter, conf, purge);
         } else {
           // We need to clean up different MM IDs from each LB directory separately.
           // Avoid temporary directories in the immediate table/part dir.
@@ -3663,7 +3732,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
               throw new HiveException("Unexpected path during overwrite: " + lbPath);
             }
             Utilities.LOG14535.info("Cleaning up LB directory " + lbPath);
-            cleanUpOneDirectoryForReplace(lbPath, oldFs, pathFilter, conf);
+            cleanUpOneDirectoryForReplace(lbPath, oldFs, pathFilter, conf, purge);
           }
         }
       }
@@ -3681,7 +3750,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
 
 
   private void cleanUpOneDirectoryForReplace(Path path, FileSystem fs,
-      PathFilter pathFilter, HiveConf conf) throws IOException, HiveException {
+      PathFilter pathFilter, HiveConf conf, boolean purge) throws IOException, HiveException {
     FileStatus[] statuses = fs.listStatus(path, pathFilter);
     if (statuses == null || statuses.length == 0) return;
     String s = "Deleting files under " + path + " for replace: ";
@@ -3689,7 +3758,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
       s += file.getPath().getName() + ", ";
     }
     Utilities.LOG14535.info(s);
-    if (!trashFiles(fs, statuses, conf)) {
+    if (!trashFiles(fs, statuses, conf, purge)) {
       throw new HiveException("Old path " + path + " has not been cleaned up.");
     }
   }
@@ -3703,7 +3772,8 @@ private void constructOneLBLocationMap(FileStatus fSta,
    * @return true if deletion successful
    * @throws IOException
    */
-  public static boolean trashFiles(final FileSystem fs, final FileStatus[] statuses, final Configuration conf)
+  public static boolean trashFiles(final FileSystem fs, final FileStatus[] statuses,
+      final Configuration conf, final boolean purge)
       throws IOException {
     boolean result = true;
 
@@ -3717,13 +3787,13 @@ private void constructOneLBLocationMap(FileStatus fSta,
     final SessionState parentSession = SessionState.get();
     for (final FileStatus status : statuses) {
       if (null == pool) {
-        result &= FileUtils.moveToTrash(fs, status.getPath(), conf);
+        result &= FileUtils.moveToTrash(fs, status.getPath(), conf, purge);
       } else {
         futures.add(pool.submit(new Callable<Boolean>() {
           @Override
           public Boolean call() throws Exception {
             SessionState.setCurrentSessionState(parentSession);
-            return FileUtils.moveToTrash(fs, status.getPath(), conf);
+            return FileUtils.moveToTrash(fs, status.getPath(), conf, purge);
           }
         }));
       }

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMaterializedViewsRegistry.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMaterializedViewsRegistry.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMaterializedViewsRegistry.java
index 1d78b4c..b121eea 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMaterializedViewsRegistry.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMaterializedViewsRegistry.java
@@ -34,6 +34,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.calcite.adapter.druid.DruidQuery;
 import org.apache.calcite.adapter.druid.DruidSchema;
 import org.apache.calcite.adapter.druid.DruidTable;
+import org.apache.calcite.adapter.druid.LocalInterval;
 import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptMaterialization;
@@ -310,7 +311,7 @@ public final class HiveMaterializedViewsRegistry {
         }
         metrics.add(field.getName());
       }
-      List<Interval> intervals = Arrays.asList(DruidTable.DEFAULT_INTERVAL);
+      List<LocalInterval> intervals = Arrays.asList(DruidTable.DEFAULT_INTERVAL);
 
       DruidTable druidTable = new DruidTable(new DruidSchema(address, address, false),
           dataSource, RelDataTypeImpl.proto(rowType), metrics, DruidTable.DEFAULT_TIMESTAMP_COLUMN, intervals);

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMetaStoreChecker.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMetaStoreChecker.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMetaStoreChecker.java
index 6805c17..4add836 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMetaStoreChecker.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMetaStoreChecker.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.metadata;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
@@ -35,7 +36,10 @@ import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 
 import com.google.common.collect.Sets;
+import org.apache.hadoop.hive.common.StringInternUtils;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.ql.log.PerfLogger;
+import org.apache.hadoop.hive.ql.session.SessionState;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.fs.FileStatus;
@@ -49,8 +53,6 @@ import org.apache.hadoop.hive.metastore.Warehouse;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.ql.metadata.CheckResult.PartitionResult;
-import org.apache.hadoop.hive.ql.optimizer.ppr.PartitionPruner;
-import org.apache.hadoop.hive.ql.parse.PrunedPartitionList;
 import org.apache.thrift.TException;
 
 import com.google.common.util.concurrent.MoreExecutors;
@@ -64,6 +66,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
 public class HiveMetaStoreChecker {
 
   public static final Logger LOG = LoggerFactory.getLogger(HiveMetaStoreChecker.class);
+  public static final String CLASS_NAME = HiveMetaStoreChecker.class.getName();
 
   private final Hive hive;
   private final HiveConf conf;
@@ -208,19 +211,28 @@ public class HiveMetaStoreChecker {
       return;
     }
 
-    List<Partition> parts = new ArrayList<Partition>();
+    PartitionIterable parts;
     boolean findUnknownPartitions = true;
 
     if (table.isPartitioned()) {
       if (partitions == null || partitions.isEmpty()) {
-        PrunedPartitionList prunedPartList =
-        PartitionPruner.prune(table, null, conf, toString(), null);
-        // no partitions specified, let's get all
-        parts.addAll(prunedPartList.getPartitions());
+        String mode = HiveConf.getVar(conf, ConfVars.HIVEMAPREDMODE, (String) null);
+        if ("strict".equalsIgnoreCase(mode)) {
+          parts = new PartitionIterable(hive, table, null, conf.getIntVar(
+              HiveConf.ConfVars.METASTORE_BATCH_RETRIEVE_MAX));
+        } else {
+          List<Partition> loadedPartitions = new ArrayList<>();
+          PerfLogger perfLogger = SessionState.getPerfLogger();
+          perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.PARTITION_RETRIEVING);
+          loadedPartitions.addAll(hive.getAllPartitionsOf(table));
+          perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.PARTITION_RETRIEVING);
+          parts = new PartitionIterable(loadedPartitions);
+        }
       } else {
         // we're interested in specific partitions,
         // don't check for any others
         findUnknownPartitions = false;
+        List<Partition> loadedPartitions = new ArrayList<>();
         for (Map<String, String> map : partitions) {
           Partition part = hive.getPartition(table, map, false);
           if (part == null) {
@@ -229,10 +241,13 @@ public class HiveMetaStoreChecker {
             pr.setPartitionName(Warehouse.makePartPath(map));
             result.getPartitionsNotInMs().add(pr);
           } else {
-            parts.add(part);
+            loadedPartitions.add(part);
           }
         }
+        parts = new PartitionIterable(loadedPartitions);
       }
+    } else {
+      parts = new PartitionIterable(Collections.<Partition>emptyList());
     }
 
     checkTable(table, parts, findUnknownPartitions, result);
@@ -255,7 +270,7 @@ public class HiveMetaStoreChecker {
    * @throws HiveException
    *           Could not create Partition object
    */
-  void checkTable(Table table, List<Partition> parts,
+  void checkTable(Table table, PartitionIterable parts,
       boolean findUnknownPartitions, CheckResult result) throws IOException,
       HiveException {
 
@@ -284,7 +299,9 @@ public class HiveMetaStoreChecker {
       }
 
       for (int i = 0; i < partition.getSpec().size(); i++) {
-        partPaths.add(partPath.makeQualified(fs));
+        Path qualifiedPath = partPath.makeQualified(fs);
+        StringInternUtils.internUriStringsInPath(qualifiedPath);
+        partPaths.add(qualifiedPath);
         partPath = partPath.getParent();
       }
     }
@@ -314,7 +331,7 @@ public class HiveMetaStoreChecker {
     // now check the table folder and see if we find anything
     // that isn't in the metastore
     Set<Path> allPartDirs = new HashSet<Path>();
-    checkPartitionDirs(tablePath, allPartDirs, table.getPartCols().size());
+    checkPartitionDirs(tablePath, allPartDirs, Collections.unmodifiableList(table.getPartColNames()));
     // don't want the table dir
     allPartDirs.remove(tablePath);
 
@@ -398,14 +415,14 @@ public class HiveMetaStoreChecker {
    *          Start directory
    * @param allDirs
    *          This set will contain the leaf paths at the end.
-   * @param maxDepth
+   * @param list
    *          Specify how deep the search goes.
    * @throws IOException
    *           Thrown if we can't get lists from the fs.
    * @throws HiveException
    */
 
-  private void checkPartitionDirs(Path basePath, Set<Path> allDirs, int maxDepth) throws IOException, HiveException {
+  private void checkPartitionDirs(Path basePath, Set<Path> allDirs, final List<String> partColNames) throws IOException, HiveException {
     // Here we just reuse the THREAD_COUNT configuration for
     // METASTORE_FS_HANDLER_THREADS_COUNT since this results in better performance
     // The number of missing partitions discovered are later added by metastore using a
@@ -423,21 +440,21 @@ public class HiveMetaStoreChecker {
           new ThreadFactoryBuilder().setDaemon(true).setNameFormat("MSCK-GetPaths-%d").build();
       executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(poolSize, threadFactory);
     }
-    checkPartitionDirs(executor, basePath, allDirs, basePath.getFileSystem(conf), maxDepth);
+    checkPartitionDirs(executor, basePath, allDirs, basePath.getFileSystem(conf), partColNames);
 
     executor.shutdown();
   }
 
   private final class PathDepthInfoCallable implements Callable<Path> {
-    private final int maxDepth;
+    private final List<String> partColNames;
     private final FileSystem fs;
     private final ConcurrentLinkedQueue<PathDepthInfo> pendingPaths;
     private final boolean throwException;
     private final PathDepthInfo pd;
 
-    private PathDepthInfoCallable(PathDepthInfo pd, int maxDepth, FileSystem fs,
+    private PathDepthInfoCallable(PathDepthInfo pd, List<String> partColNames, FileSystem fs,
         ConcurrentLinkedQueue<PathDepthInfo> basePaths) {
-      this.maxDepth = maxDepth;
+      this.partColNames = partColNames;
       this.pd = pd;
       this.fs = fs;
       this.pendingPaths = basePaths;
@@ -457,39 +474,50 @@ public class HiveMetaStoreChecker {
       FileStatus[] fileStatuses = fs.listStatus(currentPath, FileUtils.HIDDEN_FILES_PATH_FILTER);
       // found no files under a sub-directory under table base path; it is possible that the table
       // is empty and hence there are no partition sub-directories created under base path
-      if (fileStatuses.length == 0 && currentDepth > 0 && currentDepth < maxDepth) {
+      if (fileStatuses.length == 0 && currentDepth > 0 && currentDepth < partColNames.size()) {
         // since maxDepth is not yet reached, we are missing partition
         // columns in currentPath
-        if (throwException) {
-          throw new HiveException(
-              "MSCK is missing partition columns under " + currentPath.toString());
-        } else {
-          LOG.warn("MSCK is missing partition columns under " + currentPath.toString());
-        }
+        logOrThrowExceptionWithMsg(
+            "MSCK is missing partition columns under " + currentPath.toString());
       } else {
         // found files under currentPath add them to the queue if it is a directory
         for (FileStatus fileStatus : fileStatuses) {
-          if (!fileStatus.isDirectory() && currentDepth < maxDepth) {
+          if (!fileStatus.isDirectory() && currentDepth < partColNames.size()) {
             // found a file at depth which is less than number of partition keys
-            if (throwException) {
-              throw new HiveException(
-                  "MSCK finds a file rather than a directory when it searches for "
-                      + fileStatus.getPath().toString());
+            logOrThrowExceptionWithMsg(
+                "MSCK finds a file rather than a directory when it searches for "
+                    + fileStatus.getPath().toString());
+          } else if (fileStatus.isDirectory() && currentDepth < partColNames.size()) {
+            // found a sub-directory at a depth less than number of partition keys
+            // validate if the partition directory name matches with the corresponding
+            // partition colName at currentDepth
+            Path nextPath = fileStatus.getPath();
+            String[] parts = nextPath.getName().split("=");
+            if (parts.length != 2) {
+              logOrThrowExceptionWithMsg("Invalid partition name " + nextPath);
+            } else if (!parts[0].equalsIgnoreCase(partColNames.get(currentDepth))) {
+              logOrThrowExceptionWithMsg(
+                  "Unexpected partition key " + parts[0] + " found at " + nextPath);
             } else {
-              LOG.warn("MSCK finds a file rather than a directory when it searches for "
-                  + fileStatus.getPath().toString());
+              // add sub-directory to the work queue if maxDepth is not yet reached
+              pendingPaths.add(new PathDepthInfo(nextPath, currentDepth + 1));
             }
-          } else if (fileStatus.isDirectory() && currentDepth < maxDepth) {
-            // add sub-directory to the work queue if maxDepth is not yet reached
-            pendingPaths.add(new PathDepthInfo(fileStatus.getPath(), currentDepth + 1));
           }
         }
-        if (currentDepth == maxDepth) {
+        if (currentDepth == partColNames.size()) {
           return currentPath;
         }
       }
       return null;
     }
+
+    private void logOrThrowExceptionWithMsg(String msg) throws HiveException {
+      if(throwException) {
+        throw new HiveException(msg);
+      } else {
+        LOG.warn(msg);
+      }
+    }
   }
 
   private static class PathDepthInfo {
@@ -503,7 +531,7 @@ public class HiveMetaStoreChecker {
 
   private void checkPartitionDirs(final ExecutorService executor,
       final Path basePath, final Set<Path> result,
-      final FileSystem fs, final int maxDepth) throws HiveException {
+      final FileSystem fs, final List<String> partColNames) throws HiveException {
     try {
       Queue<Future<Path>> futures = new LinkedList<Future<Path>>();
       ConcurrentLinkedQueue<PathDepthInfo> nextLevel = new ConcurrentLinkedQueue<>();
@@ -520,7 +548,7 @@ public class HiveMetaStoreChecker {
         //process each level in parallel
         while(!nextLevel.isEmpty()) {
           futures.add(
-              executor.submit(new PathDepthInfoCallable(nextLevel.poll(), maxDepth, fs, tempQueue)));
+              executor.submit(new PathDepthInfoCallable(nextLevel.poll(), partColNames, fs, tempQueue)));
         }
         while(!futures.isEmpty()) {
           Path p = futures.poll().get();

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java
index 8eb011e..a319b88 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java
@@ -391,7 +391,7 @@ public class SessionHiveMetaStoreClient extends HiveMetaStoreClient implements I
       throw new MetaException("Temp table path not set for " + tbl.getTableName());
     } else {
       if (!wh.isDir(tblPath)) {
-        if (!wh.mkdirs(tblPath, true)) {
+        if (!wh.mkdirs(tblPath)) {
           throw new MetaException(tblPath
               + " is not a directory or unable to create one");
         }

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java
index c53ddad..5efaf70 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java
@@ -18,15 +18,19 @@
 
 package org.apache.hadoop.hive.ql.metadata;
 
+import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.Set;
 
+import com.google.common.base.Preconditions;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
@@ -504,6 +508,14 @@ public class Table implements Serializable {
     return null;
   }
 
+  public List<String> getPartColNames() {
+    List<String> partColNames = new ArrayList<String>();
+    for (FieldSchema key : getPartCols()) {
+      partColNames.add(key.getName());
+    }
+    return partColNames;
+  }
+
   public boolean isPartitionKey(String colName) {
     return getPartColByName(colName) == null ? false : true;
   }
@@ -936,6 +948,16 @@ public class Table implements Serializable {
     }
   }
 
+  public boolean isEmpty() throws HiveException {
+    Preconditions.checkNotNull(getPath());
+    try {
+      FileSystem fs = FileSystem.get(getPath().toUri(), SessionState.getSessionConf());
+      return !fs.exists(getPath()) || fs.listStatus(getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER).length == 0;
+    } catch (IOException e) {
+      throw new HiveException(e);
+    }
+  }
+
   public boolean isTemporary() {
     return tTable.isTemporary();
   }
@@ -964,7 +986,7 @@ public class Table implements Serializable {
 
   public static void validateColumns(List<FieldSchema> columns, List<FieldSchema> partCols)
       throws HiveException {
-    List<String> colNames = new ArrayList<String>();
+    Set<String> colNames = new HashSet<>();
     for (FieldSchema partCol: columns) {
       String colName = normalize(partCol.getName());
       if (colNames.contains(colName)) {

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/metadata/VirtualColumn.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/VirtualColumn.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/VirtualColumn.java
index 044d64c..2435bf1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/VirtualColumn.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/VirtualColumn.java
@@ -60,7 +60,7 @@ public enum VirtualColumn {
    */
   GROUPINGID("GROUPING__ID", TypeInfoFactory.intTypeInfo);
 
-  public static ImmutableSet<String> VIRTUAL_COLUMN_NAMES =
+  public static final ImmutableSet<String> VIRTUAL_COLUMN_NAMES =
       ImmutableSet.of(FILENAME.getName(), BLOCKOFFSET.getName(), ROWOFFSET.getName(),
           RAWDATASIZE.getName(), GROUPINGID.getName(), ROWID.getName());
 

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPruner.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPruner.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPruner.java
index 7e39d77..d59603e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPruner.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPruner.java
@@ -158,14 +158,15 @@ public class ColumnPruner extends Transform {
       boolean walkChildren = true;
       opStack.push(nd);
 
-      // no need to go further down for a select op with a file sink or script
-      // child
-      // since all cols are needed for these ops
+      // no need to go further down for a select op with all file sink or script
+      // child since all cols are needed for these ops
+      // However, if one of the children is not file sink or script, we still go down.
       if (nd instanceof SelectOperator) {
+        walkChildren = false;
         for (Node child : nd.getChildren()) {
-          if ((child instanceof FileSinkOperator)
-              || (child instanceof ScriptOperator)) {
-            walkChildren = false;
+          if (!(child instanceof FileSinkOperator || child instanceof ScriptOperator)) {
+            walkChildren = true;
+            break;
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java
index 00ec03e..45839ad 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java
@@ -671,10 +671,11 @@ public final class ColumnPrunerProcFactory {
 
       List<FieldNode> colsAfterReplacement = new ArrayList<>();
       List<FieldNode> newCols = new ArrayList<>();
-      for (FieldNode col : cols) {
-        int index = outputCols.indexOf(col.getFieldName());
+      for (int index = 0; index < numSelColumns; index++) {
+        String colName = outputCols.get(index);
+        FieldNode col = lookupColumn(cols, colName);
         // colExprMap.size() == size of cols from SEL(*) branch
-        if (index >= 0 && index < numSelColumns) {
+        if (col != null) {
           ExprNodeDesc transformed = colExprMap.get(col.getFieldName());
           colsAfterReplacement = mergeFieldNodesWithDesc(colsAfterReplacement, transformed);
           newCols.add(col);
@@ -713,12 +714,14 @@ public final class ColumnPrunerProcFactory {
       RowSchema rs = op.getSchema();
       ArrayList<ExprNodeDesc> colList = new ArrayList<>();
       List<FieldNode> outputCols = new ArrayList<>();
-      for (FieldNode col : cols) {
-        // revert output cols of SEL(*) to ExprNodeColumnDesc
-        ColumnInfo colInfo = rs.getColumnInfo(col.getFieldName());
-        ExprNodeColumnDesc colExpr = new ExprNodeColumnDesc(colInfo);
-        colList.add(colExpr);
-        outputCols.add(col);
+      for (ColumnInfo colInfo : rs.getSignature()) {
+        FieldNode col = lookupColumn(cols, colInfo.getInternalName());
+        if (col != null) {
+          // revert output cols of SEL(*) to ExprNodeColumnDesc
+          ExprNodeColumnDesc colExpr = new ExprNodeColumnDesc(colInfo);
+          colList.add(colExpr);
+          outputCols.add(col);
+        }
       }
       // replace SEL(*) to SEL(exprs)
       ((SelectDesc)select.getConf()).setSelStarNoCompute(false);
@@ -810,11 +813,18 @@ public final class ColumnPrunerProcFactory {
         ArrayList<String> newOutputColumnNames = new ArrayList<String>();
         ArrayList<ColumnInfo> rs_oldsignature = op.getSchema().getSignature();
         ArrayList<ColumnInfo> rs_newsignature = new ArrayList<ColumnInfo>();
+        // The pruning needs to preserve the order of columns in the input schema
+        Set<String> colNames = new HashSet<String>();
         for (FieldNode col : cols) {
-          int index = originalOutputColumnNames.indexOf(col.getFieldName());
-          newOutputColumnNames.add(col.getFieldName());
-          newColList.add(originalColList.get(index));
-          rs_newsignature.add(rs_oldsignature.get(index));
+          colNames.add(col.getFieldName());
+        }
+        for (int i = 0; i < originalOutputColumnNames.size(); i++) {
+          String colName = originalOutputColumnNames.get(i);
+          if (colNames.contains(colName)) {
+            newOutputColumnNames.add(colName);
+            newColList.add(originalColList.get(i));
+            rs_newsignature.add(rs_oldsignature.get(i));
+          }
         }
         op.getSchema().setSignature(rs_newsignature);
         conf.setColList(newColList);

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
index e68618a..d0fdb52 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
@@ -48,6 +48,7 @@ import org.apache.hadoop.hive.ql.exec.TezDummyStoreOperator;
 import org.apache.hadoop.hive.ql.lib.Node;
 import org.apache.hadoop.hive.ql.lib.NodeProcessor;
 import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.optimizer.physical.LlapClusterStateForCompile;
 import org.apache.hadoop.hive.ql.parse.GenTezUtils;
 import org.apache.hadoop.hive.ql.parse.OptimizeTezProcContext;
 import org.apache.hadoop.hive.ql.parse.ParseContext;
@@ -68,6 +69,8 @@ import org.apache.hadoop.util.ReflectionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * ConvertJoinMapJoin is an optimization that replaces a common join
  * (aka shuffle join) with a map join (aka broadcast or fragment replicate
@@ -95,15 +98,19 @@ public class ConvertJoinMapJoin implements NodeProcessor {
     JoinOperator joinOp = (JoinOperator) nd;
     long maxSize = context.conf.getLongVar(HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD);
 
+    // adjust noconditional task size threshold for LLAP
+    maxSize = getNoConditionalTaskSizeForLlap(maxSize, context.conf);
+    joinOp.getConf().setNoConditionalTaskSize(maxSize);
+
     TezBucketJoinProcCtx tezBucketJoinProcCtx = new TezBucketJoinProcCtx(context.conf);
     if (!context.conf.getBoolVar(HiveConf.ConfVars.HIVECONVERTJOIN)) {
       // we are just converting to a common merge join operator. The shuffle
       // join in map-reduce case.
-      Object retval = checkAndConvertSMBJoin(context, joinOp, tezBucketJoinProcCtx);
+      Object retval = checkAndConvertSMBJoin(context, joinOp, tezBucketJoinProcCtx, maxSize);
       if (retval == null) {
         return retval;
       } else {
-        fallbackToReduceSideJoin(joinOp, context);
+        fallbackToReduceSideJoin(joinOp, context, maxSize);
         return null;
       }
     }
@@ -120,13 +127,13 @@ public class ConvertJoinMapJoin implements NodeProcessor {
     LOG.info("Estimated number of buckets " + numBuckets);
     int mapJoinConversionPos = getMapJoinConversionPos(joinOp, context, numBuckets, false, maxSize, true);
     if (mapJoinConversionPos < 0) {
-      Object retval = checkAndConvertSMBJoin(context, joinOp, tezBucketJoinProcCtx);
+      Object retval = checkAndConvertSMBJoin(context, joinOp, tezBucketJoinProcCtx, maxSize);
       if (retval == null) {
         return retval;
       } else {
         // only case is full outer join with SMB enabled which is not possible. Convert to regular
         // join.
-        fallbackToReduceSideJoin(joinOp, context);
+        fallbackToReduceSideJoin(joinOp, context, maxSize);
         return null;
       }
     }
@@ -147,7 +154,7 @@ public class ConvertJoinMapJoin implements NodeProcessor {
     if (mapJoinConversionPos < 0) {
       // we are just converting to a common merge join operator. The shuffle
       // join in map-reduce case.
-      fallbackToReduceSideJoin(joinOp, context);
+      fallbackToReduceSideJoin(joinOp, context, maxSize);
       return null;
     }
 
@@ -164,15 +171,54 @@ public class ConvertJoinMapJoin implements NodeProcessor {
     return null;
   }
 
+  @VisibleForTesting
+  public long getNoConditionalTaskSizeForLlap(final long maxSize, final HiveConf conf) {
+    if ("llap".equalsIgnoreCase(conf.getVar(ConfVars.HIVE_EXECUTION_MODE))) {
+      LlapClusterStateForCompile llapInfo = LlapClusterStateForCompile.getClusterInfo(conf);
+      llapInfo.initClusterInfo();
+      final int executorsPerNode;
+      if (!llapInfo.hasClusterInfo()) {
+        LOG.warn("LLAP cluster information not available. Falling back to getting #executors from hiveconf..");
+        executorsPerNode = conf.getIntVar(ConfVars.LLAP_DAEMON_NUM_EXECUTORS);
+      } else {
+        final int numExecutorsPerNodeFromCluster = llapInfo.getNumExecutorsPerNode();
+        if (numExecutorsPerNodeFromCluster == -1) {
+          LOG.warn("Cannot determine executor count from LLAP cluster information. Falling back to getting #executors" +
+            " from hiveconf..");
+          executorsPerNode = conf.getIntVar(ConfVars.LLAP_DAEMON_NUM_EXECUTORS);
+        } else {
+          executorsPerNode = numExecutorsPerNodeFromCluster;
+        }
+      }
+      final int numSessions = conf.getIntVar(ConfVars.HIVE_SERVER2_TEZ_SESSIONS_PER_DEFAULT_QUEUE);
+      if (numSessions > 0) {
+        final int availableSlotsPerQuery = (int) ((double) executorsPerNode / numSessions);
+        final double overSubscriptionFactor = conf.getFloatVar(ConfVars.LLAP_MAPJOIN_MEMORY_OVERSUBSCRIBE_FACTOR);
+        final int maxSlotsPerQuery = conf.getIntVar(ConfVars.LLAP_MEMORY_OVERSUBSCRIPTION_MAX_EXECUTORS_PER_QUERY);
+        final int slotsPerQuery = Math.min(maxSlotsPerQuery, availableSlotsPerQuery);
+        final long llapMaxSize = (long) (maxSize + (maxSize * overSubscriptionFactor * slotsPerQuery));
+        LOG.info("No conditional task size adjusted for LLAP. executorsPerNode: {}, numSessions: {}, " +
+            "availableSlotsPerQuery: {}, overSubscriptionFactor: {}, maxSlotsPerQuery: {}, slotsPerQuery: {}, " +
+            "noconditionalTaskSize: {}, adjustedNoconditionalTaskSize: {}", executorsPerNode, numSessions,
+          availableSlotsPerQuery, overSubscriptionFactor, maxSlotsPerQuery, slotsPerQuery, maxSize, llapMaxSize);
+        return Math.max(maxSize, llapMaxSize);
+      } else {
+        LOG.warn(ConfVars.HIVE_SERVER2_TEZ_SESSIONS_PER_DEFAULT_QUEUE.varname + " returned value {}. Returning {}" +
+          " as no conditional task size for LLAP.", numSessions, maxSize);
+      }
+    }
+    return maxSize;
+  }
+
   @SuppressWarnings("unchecked")
   private Object checkAndConvertSMBJoin(OptimizeTezProcContext context, JoinOperator joinOp,
-      TezBucketJoinProcCtx tezBucketJoinProcCtx) throws SemanticException {
+    TezBucketJoinProcCtx tezBucketJoinProcCtx, final long maxSize) throws SemanticException {
     // we cannot convert to bucket map join, we cannot convert to
     // map join either based on the size. Check if we can convert to SMB join.
     if ((HiveConf.getBoolVar(context.conf, ConfVars.HIVE_AUTO_SORTMERGE_JOIN) == false)
       || ((!HiveConf.getBoolVar(context.conf, ConfVars.HIVE_AUTO_SORTMERGE_JOIN_REDUCE))
           && joinOp.getOpTraits().getNumReduceSinks() >= 2)) {
-      fallbackToReduceSideJoin(joinOp, context);
+      fallbackToReduceSideJoin(joinOp, context, maxSize);
       return null;
     }
     Class<? extends BigTableSelectorForAutoSMJ> bigTableMatcherClass = null;
@@ -201,7 +247,7 @@ public class ConvertJoinMapJoin implements NodeProcessor {
       // contains aliases from sub-query
       // we are just converting to a common merge join operator. The shuffle
       // join in map-reduce case.
-      fallbackToReduceSideJoin(joinOp, context);
+      fallbackToReduceSideJoin(joinOp, context, maxSize);
       return null;
     }
 
@@ -211,7 +257,7 @@ public class ConvertJoinMapJoin implements NodeProcessor {
     } else {
       // we are just converting to a common merge join operator. The shuffle
       // join in map-reduce case.
-      fallbackToReduceSideJoin(joinOp, context);
+      fallbackToReduceSideJoin(joinOp, context, maxSize);
     }
     return null;
   }
@@ -235,7 +281,7 @@ public class ConvertJoinMapJoin implements NodeProcessor {
                   joinOp.getConf().getBaseSrc(), joinOp).getSecond(),
                   null, joinDesc.getExprs(), null, null,
                   joinDesc.getOutputColumnNames(), mapJoinConversionPos, joinDesc.getConds(),
-                  joinDesc.getFilters(), joinDesc.getNoOuterJoin(), null);
+                  joinDesc.getFilters(), joinDesc.getNoOuterJoin(), null, joinDesc.getNoConditionalTaskSize());
       mapJoinDesc.setNullSafes(joinDesc.getNullSafes());
       mapJoinDesc.setFilterMap(joinDesc.getFilterMap());
       mapJoinDesc.setResidualFilterExprs(joinDesc.getResidualFilterExprs());
@@ -794,7 +840,7 @@ public class ConvertJoinMapJoin implements NodeProcessor {
       // The semijoin branch can potentially create a task level cycle
       // with the hashjoin except when it is dynamically partitioned hash
       // join which takes place in a separate task.
-      if (context.parseContext.getRsOpToTsOpMap().size() > 0
+      if (context.parseContext.getRsToSemiJoinBranchInfo().size() > 0
               && removeReduceSink) {
         removeCycleCreatingSemiJoinOps(mapJoinOp, parentSelectOpOfBigTableOp,
                 context.parseContext);
@@ -826,7 +872,7 @@ public class ConvertJoinMapJoin implements NodeProcessor {
       }
 
       ReduceSinkOperator rs = (ReduceSinkOperator) op;
-      TableScanOperator ts = parseContext.getRsOpToTsOpMap().get(rs);
+      TableScanOperator ts = parseContext.getRsToSemiJoinBranchInfo().get(rs).getTsOp();
       if (ts == null) {
         // skip, no semijoin branch
         continue;
@@ -851,6 +897,11 @@ public class ConvertJoinMapJoin implements NodeProcessor {
     }
     if (semiJoinMap.size() > 0) {
       for (ReduceSinkOperator rs : semiJoinMap.keySet()) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Found semijoin optimization from the big table side of a map join, which will cause a task cycle. "
+              + "Removing semijoin "
+              + OperatorUtils.getOpNamePretty(rs) + " - " + OperatorUtils.getOpNamePretty(semiJoinMap.get(rs)));
+        }
         GenTezUtils.removeBranch(rs);
         GenTezUtils.removeSemiJoinOperator(parseContext, rs,
                 semiJoinMap.get(rs));
@@ -923,15 +974,14 @@ public class ConvertJoinMapJoin implements NodeProcessor {
     return numBuckets;
   }
 
-  private boolean convertJoinDynamicPartitionedHashJoin(JoinOperator joinOp, OptimizeTezProcContext context)
+  private boolean convertJoinDynamicPartitionedHashJoin(JoinOperator joinOp, OptimizeTezProcContext context,
+    final long maxSize)
     throws SemanticException {
     // Attempt dynamic partitioned hash join
     // Since we don't have big table index yet, must start with estimate of numReducers
     int numReducers = estimateNumBuckets(joinOp, false);
     LOG.info("Try dynamic partitioned hash join with estimated " + numReducers + " reducers");
-    int bigTablePos = getMapJoinConversionPos(joinOp, context, numReducers, false,
-            context.conf.getLongVar(HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD),
-            false);
+    int bigTablePos = getMapJoinConversionPos(joinOp, context, numReducers, false, maxSize,false);
     if (bigTablePos >= 0) {
       // Now that we have the big table index, get real numReducers value based on big table RS
       ReduceSinkOperator bigTableParentRS =
@@ -966,11 +1016,11 @@ public class ConvertJoinMapJoin implements NodeProcessor {
     return false;
   }
 
-  private void fallbackToReduceSideJoin(JoinOperator joinOp, OptimizeTezProcContext context)
+  private void fallbackToReduceSideJoin(JoinOperator joinOp, OptimizeTezProcContext context, final long maxSize)
       throws SemanticException {
     if (context.conf.getBoolVar(HiveConf.ConfVars.HIVECONVERTJOIN) &&
         context.conf.getBoolVar(HiveConf.ConfVars.HIVEDYNAMICPARTITIONHASHJOIN)) {
-      if (convertJoinDynamicPartitionedHashJoin(joinOp, context)) {
+      if (convertJoinDynamicPartitionedHashJoin(joinOp, context, maxSize)) {
         return;
       }
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java
index b6db6aa..b8c0102 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hive.ql.optimizer;
 
 import java.util.ArrayList;
+import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
@@ -42,12 +43,7 @@ import org.apache.hadoop.hive.ql.lib.RuleRegExp;
 import org.apache.hadoop.hive.ql.metadata.Partition;
 import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.optimizer.spark.SparkPartitionPruningSinkDesc;
-import org.apache.hadoop.hive.ql.parse.OptimizeTezProcContext;
-import org.apache.hadoop.hive.ql.parse.ParseContext;
-import org.apache.hadoop.hive.ql.parse.PrunedPartitionList;
-import org.apache.hadoop.hive.ql.parse.RuntimeValuesInfo;
-import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer;
-import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.*;
 import org.apache.hadoop.hive.ql.parse.spark.OptimizeSparkProcContext;
 import org.apache.hadoop.hive.ql.plan.*;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFBloomFilter.GenericUDAFBloomFilterEvaluator;
@@ -214,16 +210,34 @@ public class DynamicPartitionPruningOptimization implements NodeProcessor {
         } else {
           LOG.debug("Column " + column + " is not a partition column");
           if (semiJoin && ts.getConf().getFilterExpr() != null) {
-            LOG.debug("Initiate semijoin reduction for " + column);
-            // Get the table name from which the min-max values will come.
+            LOG.debug("Initiate semijoin reduction for " + column + " ("
+                + ts.getConf().getFilterExpr().getExprString());
+            // Get the table name from which the min-max values and bloom filter will come.
             Operator<?> op = ctx.generator;
+
             while (!(op == null || op instanceof TableScanOperator)) {
               op = op.getParentOperators().get(0);
             }
             String tableAlias = (op == null ? "" : ((TableScanOperator) op).getConf().getAlias());
+
             keyBaseAlias = ctx.generator.getOperatorId() + "_" + tableAlias + "_" + column;
 
-            semiJoinAttempted = generateSemiJoinOperatorPlan(ctx, parseContext, ts, keyBaseAlias);
+            Map<String, SemiJoinHint> hints = parseContext.getSemiJoinHints();
+            if (hints != null) {
+              // If hints map has no entry that would imply that user enforced
+              // no runtime filtering.
+              if (hints.size() > 0) {
+                SemiJoinHint sjHint = hints.get(tableAlias);
+                semiJoinAttempted = generateSemiJoinOperatorPlan(
+                        ctx, parseContext, ts, keyBaseAlias, sjHint);
+                if (!semiJoinAttempted && sjHint != null) {
+                  throw new SemanticException("The user hint to enforce semijoin failed required conditions");
+                }
+              }
+            } else {
+              semiJoinAttempted = generateSemiJoinOperatorPlan(
+                      ctx, parseContext, ts, keyBaseAlias, null);
+            }
           }
         }
 
@@ -386,7 +400,13 @@ public class DynamicPartitionPruningOptimization implements NodeProcessor {
 
   // Generates plan for min/max when dynamic partition pruning is ruled out.
   private boolean generateSemiJoinOperatorPlan(DynamicListContext ctx, ParseContext parseContext,
-      TableScanOperator ts, String keyBaseAlias) throws SemanticException {
+      TableScanOperator ts, String keyBaseAlias, SemiJoinHint sjHint) throws SemanticException {
+
+    // If semijoin hint is enforced, make sure hint is provided
+    if (parseContext.getConf().getBoolVar(ConfVars.TEZ_DYNAMIC_SEMIJOIN_REDUCTION_HINT_ONLY)
+            && sjHint == null) {
+        return false;
+    }
 
     // we will put a fork in the plan at the source of the reduce sink
     Operator<? extends OperatorDesc> parentOfRS = ctx.generator.getParentOperators().get(0);
@@ -402,33 +422,62 @@ public class DynamicPartitionPruningOptimization implements NodeProcessor {
       exprNodeDesc = exprNodeDesc.getChildren().get(0);
     }
 
-    if (exprNodeDesc instanceof ExprNodeColumnDesc) {
-      internalColName = ((ExprNodeColumnDesc) exprNodeDesc).getColumn();
-      if (parentOfRS instanceof SelectOperator) {
-        // Make sure the semijoin branch is not on parition column.
-        ExprNodeColumnDesc colExpr = ((ExprNodeColumnDesc) (parentOfRS.
-                getColumnExprMap().get(internalColName)));
-        String colName = ExprNodeDescUtils.extractColName(colExpr);
-
-        // Fetch the TableScan Operator.
-        Operator<?> op = parentOfRS.getParentOperators().get(0);
-        while (op != null && !(op instanceof TableScanOperator)) {
-          op = op.getParentOperators().get(0);
-        }
-        assert op != null;
-
-        Table table = ((TableScanOperator) op).getConf().getTableMetadata();
-        if (table.isPartitionKey(colName)) {
-          // The column is partition column, skip the optimization.
-          return false;
-        }
-      }
-    } else {
+    if (!(exprNodeDesc instanceof ExprNodeColumnDesc)) {
       // No column found!
       // Bail out
       return false;
     }
 
+    internalColName = ((ExprNodeColumnDesc) exprNodeDesc).getColumn();
+    if (parentOfRS instanceof SelectOperator) {
+      // Make sure the semijoin branch is not on partition column.
+      ExprNodeDesc expr = parentOfRS.getColumnExprMap().get(internalColName);
+      while (!(expr instanceof ExprNodeColumnDesc) &&
+              (expr.getChildren() != null)) {
+        expr = expr.getChildren().get(0);
+      }
+
+      if (!(expr instanceof ExprNodeColumnDesc)) {
+        // No column found!
+        // Bail out
+        return false;
+      }
+
+      ExprNodeColumnDesc colExpr = (ExprNodeColumnDesc) expr;
+      String colName = ExprNodeDescUtils.extractColName(colExpr);
+
+      // Fetch the TableScan Operator.
+      Operator<?> op = parentOfRS.getParentOperators().get(0);
+      while (op != null && !(op instanceof TableScanOperator)) {
+        op = op.getParentOperators().get(0);
+      }
+      assert op != null;
+
+      Table table = ((TableScanOperator) op).getConf().getTableMetadata();
+      if (table.isPartitionKey(colName)) {
+        // The column is partition column, skip the optimization.
+        return false;
+      }
+    }
+
+    // If hint is provided and only hinted semijoin optimizations should be
+    // created, then skip other columns on the table
+    if (parseContext.getConf().getBoolVar(ConfVars.TEZ_DYNAMIC_SEMIJOIN_REDUCTION_HINT_ONLY)
+            && sjHint.getColName() != null &&
+            !internalColName.equals(sjHint.getColName())) {
+      return false;
+    }
+
+    // Check if there already exists a semijoin branch
+    GroupByOperator gb = parseContext.getColExprToGBMap().get(key);
+    if (gb != null) {
+      // Already an existing semijoin branch, reuse it
+      createFinalRsForSemiJoinOp(parseContext, ts, gb, key, keyBaseAlias,
+              ctx.parent.getChildren().get(0), sjHint != null);
+      // done!
+      return true;
+    }
+
     List<ExprNodeDesc> keyExprs = new ArrayList<ExprNodeDesc>();
     keyExprs.add(key);
 
@@ -472,8 +521,6 @@ public class DynamicPartitionPruningOptimization implements NodeProcessor {
             HiveConf.getFloatVar(parseContext.getConf(),
                     HiveConf.ConfVars.HIVEMAPAGGRMEMORYTHRESHOLD);
 
-    ArrayList<ExprNodeDesc> groupByExprs = new ArrayList<ExprNodeDesc>();
-
     // Add min/max and bloom filter aggregations
     List<ObjectInspector> aggFnOIs = new ArrayList<ObjectInspector>();
     aggFnOIs.add(key.getWritableObjectInspector());
@@ -493,9 +540,17 @@ public class DynamicPartitionPruningOptimization implements NodeProcessor {
       AggregationDesc bloomFilter = new AggregationDesc("bloom_filter",
               FunctionRegistry.getGenericUDAFEvaluator("bloom_filter", aggFnOIs, false, false),
               params, false, Mode.PARTIAL1);
-      GenericUDAFBloomFilterEvaluator bloomFilterEval = (GenericUDAFBloomFilterEvaluator) bloomFilter.getGenericUDAFEvaluator();
+      GenericUDAFBloomFilterEvaluator bloomFilterEval =
+          (GenericUDAFBloomFilterEvaluator) bloomFilter.getGenericUDAFEvaluator();
       bloomFilterEval.setSourceOperator(selectOp);
+
+      if (sjHint != null && sjHint.getNumEntries() > 0) {
+        LOG.debug("Setting size for " + keyBaseAlias + " to " + sjHint.getNumEntries() + " based on the hint");
+        bloomFilterEval.setHintEntries(sjHint.getNumEntries());
+      }
       bloomFilterEval.setMaxEntries(parseContext.getConf().getLongVar(ConfVars.TEZ_MAX_BLOOM_FILTER_ENTRIES));
+      bloomFilterEval.setMinEntries(parseContext.getConf().getLongVar(ConfVars.TEZ_MIN_BLOOM_FILTER_ENTRIES));
+      bloomFilterEval.setFactor(parseContext.getConf().getFloatVar(ConfVars.TEZ_BLOOM_FILTER_FACTOR));
       bloomFilter.setGenericUDAFWritableEvaluator(bloomFilterEval);
       aggs.add(min);
       aggs.add(max);
@@ -547,6 +602,8 @@ public class DynamicPartitionPruningOptimization implements NodeProcessor {
     Map<String, ExprNodeDesc> columnExprMap = new HashMap<String, ExprNodeDesc>();
     rsOp.setColumnExprMap(columnExprMap);
 
+    rsOp.getConf().setReducerTraits(EnumSet.of(ReduceSinkDesc.ReducerTraits.QUICKSTART));
+
     // Create the final Group By Operator
     ArrayList<AggregationDesc> aggsFinal = new ArrayList<AggregationDesc>();
     try {
@@ -591,7 +648,12 @@ public class DynamicPartitionPruningOptimization implements NodeProcessor {
               bloomFilterFinalParams, false, Mode.FINAL);
       GenericUDAFBloomFilterEvaluator bloomFilterEval = (GenericUDAFBloomFilterEvaluator) bloomFilter.getGenericUDAFEvaluator();
       bloomFilterEval.setSourceOperator(selectOp);
+      if (sjHint != null && sjHint.getNumEntries() > 0) {
+        bloomFilterEval.setHintEntries(sjHint.getNumEntries());
+      }
       bloomFilterEval.setMaxEntries(parseContext.getConf().getLongVar(ConfVars.TEZ_MAX_BLOOM_FILTER_ENTRIES));
+      bloomFilterEval.setMinEntries(parseContext.getConf().getLongVar(ConfVars.TEZ_MIN_BLOOM_FILTER_ENTRIES));
+      bloomFilterEval.setFactor(parseContext.getConf().getFloatVar(ConfVars.TEZ_BLOOM_FILTER_FACTOR));
       bloomFilter.setGenericUDAFWritableEvaluator(bloomFilterEval);
 
       aggsFinal.add(min);
@@ -617,23 +679,56 @@ public class DynamicPartitionPruningOptimization implements NodeProcessor {
       rsOp.getConf().setOutputOperators(outputOperators);
     }
 
+    createFinalRsForSemiJoinOp(parseContext, ts, groupByOpFinal, key,
+            keyBaseAlias, ctx.parent.getChildren().get(0), sjHint != null);
+
+    return true;
+  }
+
+  private void createFinalRsForSemiJoinOp(
+          ParseContext parseContext, TableScanOperator ts, GroupByOperator gb,
+          ExprNodeDesc key, String keyBaseAlias, ExprNodeDesc colExpr,
+          boolean isHint) throws SemanticException {
+    ArrayList<String> gbOutputNames = new ArrayList<>();
+    // One each for min, max and bloom filter
+    gbOutputNames.add(SemanticAnalyzer.getColumnInternalName(0));
+    gbOutputNames.add(SemanticAnalyzer.getColumnInternalName(1));
+    gbOutputNames.add(SemanticAnalyzer.getColumnInternalName(2));
+
+    int colPos = 0;
+    ArrayList<ExprNodeDesc> rsValueCols = new ArrayList<ExprNodeDesc>();
+    for (int i = 0; i < gbOutputNames.size() - 1; i++) {
+      ExprNodeColumnDesc expr = new ExprNodeColumnDesc(key.getTypeInfo(),
+              gbOutputNames.get(colPos++), "", false);
+      rsValueCols.add(expr);
+    }
+
+    // Bloom Filter uses binary
+    ExprNodeColumnDesc colBFExpr = new ExprNodeColumnDesc(TypeInfoFactory.binaryTypeInfo,
+            gbOutputNames.get(colPos++), "", false);
+    rsValueCols.add(colBFExpr);
+
     // Create the final Reduce Sink Operator
     ReduceSinkDesc rsDescFinal = PlanUtils.getReduceSinkDesc(
             new ArrayList<ExprNodeDesc>(), rsValueCols, gbOutputNames, false,
             -1, 0, 1, Operation.NOT_ACID);
     ReduceSinkOperator rsOpFinal = (ReduceSinkOperator)OperatorFactory.getAndMakeChild(
-            rsDescFinal, new RowSchema(groupByOpFinal.getSchema()), groupByOpFinal);
+            rsDescFinal, new RowSchema(gb.getSchema()), gb);
+    Map<String, ExprNodeDesc> columnExprMap = new HashMap<>();
     rsOpFinal.setColumnExprMap(columnExprMap);
 
-    LOG.debug("DynamicMinMaxPushdown: Saving RS to TS mapping: " + rsOpFinal + ": " + ts);
-    parseContext.getRsOpToTsOpMap().put(rsOpFinal, ts);
+    LOG.debug("DynamicSemiJoinPushdown: Saving RS to TS mapping: " + rsOpFinal + ": " + ts);
+    SemiJoinBranchInfo sjInfo = new SemiJoinBranchInfo(ts, isHint);
+    parseContext.getRsToSemiJoinBranchInfo().put(rsOpFinal, sjInfo);
 
     // for explain purpose
-    if (parseContext.getContext().getExplainConfig() != null
-        && parseContext.getContext().getExplainConfig().isFormatted()) {
-      List<String> outputOperators = new ArrayList<>();
+    if (parseContext.getContext().getExplainConfig() != null &&
+            parseContext.getContext().getExplainConfig().isFormatted()) {
+      List<String> outputOperators = rsOpFinal.getConf().getOutputOperators();
+      if (outputOperators == null) {
+        outputOperators = new ArrayList<>();
+      }
       outputOperators.add(ts.getOperatorId());
-      rsOpFinal.getConf().setOutputOperators(outputOperators);
     }
 
     // Save the info that is required at query time to resolve dynamic/runtime values.
@@ -648,9 +743,9 @@ public class DynamicPartitionPruningOptimization implements NodeProcessor {
     runtimeValuesInfo.setTableDesc(rsFinalTableDesc);
     runtimeValuesInfo.setDynamicValueIDs(dynamicValueIDs);
     runtimeValuesInfo.setColExprs(rsValueCols);
+    runtimeValuesInfo.setTsColExpr(colExpr);
     parseContext.getRsToRuntimeValuesInfoMap().put(rsOpFinal, runtimeValuesInfo);
-
-    return true;
+    parseContext.getColExprToGBMap().put(key, gb);
   }
 
   private Map<Node, Object> collectDynamicPruningConditions(ExprNodeDesc pred, NodeProcessorCtx ctx)