You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by we...@apache.org on 2017/05/05 17:31:50 UTC
[03/51] [partial] hive git commit: HIVE-14671 : merge master into
hive-14535 (Wei Zheng)
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/log/LogDivertAppenderForTest.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/log/LogDivertAppenderForTest.java b/ql/src/java/org/apache/hadoop/hive/ql/log/LogDivertAppenderForTest.java
new file mode 100644
index 0000000..966c264
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/log/LogDivertAppenderForTest.java
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.log;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.LoggerContext;
+import org.apache.logging.log4j.core.appender.RandomAccessFileAppender;
+import org.apache.logging.log4j.core.appender.routing.Route;
+import org.apache.logging.log4j.core.appender.routing.Routes;
+import org.apache.logging.log4j.core.appender.routing.RoutingAppender;
+import org.apache.logging.log4j.core.config.Configuration;
+import org.apache.logging.log4j.core.config.LoggerConfig;
+import org.apache.logging.log4j.core.config.Node;
+import org.apache.logging.log4j.core.config.plugins.Plugin;
+import org.apache.logging.log4j.core.config.plugins.PluginFactory;
+import org.apache.logging.log4j.core.config.plugins.processor.PluginEntry;
+import org.apache.logging.log4j.core.config.plugins.util.PluginType;
+import org.apache.logging.log4j.core.filter.AbstractFilter;
+import org.apache.logging.log4j.core.layout.PatternLayout;
+
+/**
+ * Divert appender to redirect and filter test operation logs to match the output of the original
+ * CLI qtest results.
+ */
+public final class LogDivertAppenderForTest {
+ private LogDivertAppenderForTest() {
+ // Prevent instantiation
+ }
+
+ /**
+ * A log filter that filters test messages coming from the logger.
+ */
+ @Plugin(name = "TestFilter", category = "Core", elementType="filter", printObject = true)
+ private static class TestFilter extends AbstractFilter {
+ @Override
+ public Result filter(LogEvent event) {
+ if (event.getLevel().equals(Level.INFO) && "SessionState".equals(event.getLoggerName())) {
+ if (event.getMessage().getFormattedMessage().startsWith("PREHOOK:")
+ || event.getMessage().getFormattedMessage().startsWith("POSTHOOK:")) {
+ return Result.ACCEPT;
+ }
+ }
+ return Result.DENY;
+ }
+
+ @PluginFactory
+ public static TestFilter createFilter() {
+ return new TestFilter();
+ }
+ }
+
+ /**
+ * If the HIVE_IN_TEST is set, then programmatically register a routing appender to Log4J
+ * configuration, which automatically writes the test log of each query to an individual file.
+ * The equivalent property configuration is as follows:
+ * # queryId based routing file appender
+ appender.test-query-routing.type = Routing
+ appender.test-query-routing.name = test-query-routing
+ appender.test-query-routing.routes.type = Routes
+ appender.test-query-routing.routes.pattern = $${ctx:queryId}
+ # default route
+ appender.test-query-routing.routes.test-route-default.type = Route
+ appender.test-query-routing.routes.test-route-default.key = $${ctx:queryId}
+ appender.test-query-routing.routes.test-route-default.app.type = NullAppender
+ appender.test-query-routing.routes.test-route-default.app.name = test-null-appender
+ # queryId based route
+ appender.test-query-routing.routes.test-route-mdc.type = Route
+ appender.test-query-routing.routes.test-route-mdc.name = test-query-routing
+ appender.test-query-routing.routes.test-route-mdc.app.type = RandomAccessFile
+ appender.test-query-routing.routes.test-route-mdc.app.name = test-query-file-appender
+ appender.test-query-routing.routes.test-route-mdc.app.fileName = ${sys:hive.log.dir}/${ctx:sessionId}/${ctx:queryId}.test
+ appender.test-query-routing.routes.test-route-mdc.app.layout.type = PatternLayout
+ appender.test-query-routing.routes.test-route-mdc.app.layout.pattern = %d{ISO8601} %5p %c{2}: %m%n
+ appender.test-query-routing.routes.test-route-mdc.app.filter.type = TestFilter
+ * @param conf the configuration for HiveServer2 instance
+ */
+ public static void registerRoutingAppenderIfInTest(org.apache.hadoop.conf.Configuration conf) {
+ if (!conf.getBoolean(HiveConf.ConfVars.HIVE_IN_TEST.varname,
+ HiveConf.ConfVars.HIVE_IN_TEST.defaultBoolVal)) {
+ // If not in test mode, then do no create the appender
+ return;
+ }
+
+ String logLocation =
+ HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_SERVER2_LOGGING_OPERATION_LOG_LOCATION);
+
+ // Create test-null-appender to drop events without queryId
+ PluginEntry nullAppenderEntry = new PluginEntry();
+ nullAppenderEntry.setClassName(NullAppender.class.getName());
+ PluginType<NullAppender> nullAppenderType =
+ new PluginType<NullAppender>(nullAppenderEntry, NullAppender.class, "appender");
+ Node nullAppenderChildNode = new Node(null, "test-null-appender", nullAppenderType);
+
+ // Create default route where events go without queryId
+ PluginEntry defaultRouteEntry = new PluginEntry();
+ defaultRouteEntry.setClassName(Route.class.getName());
+ PluginType<Route> defaultRouteType = new PluginType<Route>(defaultRouteEntry, Route.class, "");
+ Node defaultRouteNode = new Node(null, "test-route-default", defaultRouteType);
+ // Add the test-null-appender to the default route
+ defaultRouteNode.getChildren().add(nullAppenderChildNode);
+
+ // Create queryId based route
+ PluginEntry queryIdRouteEntry = new PluginEntry();
+ queryIdRouteEntry.setClassName(Route.class.getName());
+ PluginType<Route> queryIdRouteType = new PluginType<Route>(queryIdRouteEntry, Route.class, "");
+ Node queryIdRouteNode = new Node(null, "test-route-mdc", queryIdRouteType);
+
+ // Create the queryId appender for the queryId route
+ PluginEntry queryIdAppenderEntry = new PluginEntry();
+ queryIdAppenderEntry.setClassName(RandomAccessFileAppender.class.getName());
+ PluginType<RandomAccessFileAppender> queryIdAppenderType =
+ new PluginType<RandomAccessFileAppender>(queryIdAppenderEntry,
+ RandomAccessFileAppender.class, "appender");
+ Node queryIdAppenderNode =
+ new Node(queryIdRouteNode, "test-query-file-appender", queryIdAppenderType);
+ queryIdAppenderNode.getAttributes().put("fileName", logLocation
+ + "/${ctx:sessionId}/${ctx:queryId}.test");
+ queryIdAppenderNode.getAttributes().put("name", "test-query-file-appender");
+ // Add the queryId appender to the queryId based route
+ queryIdRouteNode.getChildren().add(queryIdAppenderNode);
+
+ // Create the filter for the queryId appender
+ PluginEntry filterEntry = new PluginEntry();
+ filterEntry.setClassName(TestFilter.class.getName());
+ PluginType<TestFilter> filterType =
+ new PluginType<TestFilter>(filterEntry, TestFilter.class, "");
+ Node filterNode = new Node(queryIdAppenderNode, "test-filter", filterType);
+ // Add the filter to the queryId appender
+ queryIdAppenderNode.getChildren().add(filterNode);
+
+ // Create the layout for the queryId appender
+ PluginEntry layoutEntry = new PluginEntry();
+ layoutEntry.setClassName(PatternLayout.class.getName());
+ PluginType<PatternLayout> layoutType =
+ new PluginType<PatternLayout>(layoutEntry, PatternLayout.class, "");
+ Node layoutNode = new Node(queryIdAppenderNode, "PatternLayout", layoutType);
+ layoutNode.getAttributes().put("pattern", LogDivertAppender.nonVerboseLayout);
+ // Add the layout to the queryId appender
+ queryIdAppenderNode.getChildren().add(layoutNode);
+
+ // Create the route objects based on the Nodes
+ Route defaultRoute = Route.createRoute(null, "${ctx:queryId}", defaultRouteNode);
+ Route mdcRoute = Route.createRoute(null, null, queryIdRouteNode);
+ // Create the routes group
+ Routes routes = Routes.createRoutes("${ctx:queryId}", defaultRoute, mdcRoute);
+
+ LoggerContext context = (LoggerContext)LogManager.getContext(false);
+ Configuration configuration = context.getConfiguration();
+
+ // Create the appender
+ RoutingAppender routingAppender = RoutingAppender.createAppender("test-query-routing",
+ "true",
+ routes,
+ configuration,
+ null,
+ null,
+ null);
+
+ LoggerConfig loggerConfig = configuration.getRootLogger();
+ loggerConfig.addAppender(routingAppender, null, null);
+ context.updateLoggers();
+ routingAppender.start();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index ea87cb4..de68876 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -697,6 +697,11 @@ public class Hive {
throws InvalidOperationException, HiveException {
try {
validatePartition(newPart);
+ String location = newPart.getLocation();
+ if (location != null && !Utilities.isDefaultNameNode(conf)) {
+ location = Utilities.getQualifiedPath(conf, new Path(location));
+ newPart.setLocation(location);
+ }
getMSC().alter_partition(dbName, tblName, newPart.getTPartition(), environmentContext);
} catch (MetaException e) {
@@ -736,6 +741,11 @@ public class Hive {
if (tmpPart.getParameters() != null) {
tmpPart.getParameters().remove(hive_metastoreConstants.DDL_TIME);
}
+ String location = tmpPart.getLocation();
+ if (location != null && !Utilities.isDefaultNameNode(conf)) {
+ location = Utilities.getQualifiedPath(conf, new Path(location));
+ tmpPart.setLocation(location);
+ }
newTParts.add(tmpPart.getTPartition());
}
getMSC().alter_partitions(names[0], names[1], newTParts, environmentContext);
@@ -1208,6 +1218,27 @@ public class Hive {
}
}
+
+
+ /**
+ * Truncates the table/partition as per specifications. Just trash the data files
+ *
+ * @param dbDotTableName
+ * name of the table
+ * @throws HiveException
+ */
+ public void truncateTable(String dbDotTableName, Map<String, String> partSpec) throws HiveException {
+ try {
+ Table table = getTable(dbDotTableName, true);
+
+ List<String> partNames = ((null == partSpec)
+ ? null : getPartitionNames(table.getDbName(), table.getTableName(), partSpec, (short) -1));
+ getMSC().truncateTable(table.getDbName(), table.getTableName(), partNames);
+ } catch (Exception e) {
+ throw new HiveException(e);
+ }
+ }
+
public HiveConf getConf() {
return (conf);
}
@@ -1413,7 +1444,6 @@ public class Hive {
*/
public List<String> getTablesByType(String dbName, String pattern, TableType type)
throws HiveException {
- List<String> retList = new ArrayList<String>();
if (dbName == null)
dbName = SessionState.get().getCurrentDatabase();
@@ -1578,7 +1608,20 @@ public class Hive {
return getDatabase(currentDb);
}
- public void loadSinglePartition(Path loadPath, String tableName,
+ /**
+ * @param loadPath
+ * @param tableName
+ * @param partSpec
+ * @param replace
+ * @param inheritTableSpecs
+ * @param isSkewedStoreAsSubdir
+ * @param isSrcLocal
+ * @param isAcid
+ * @param hasFollowingStatsTask
+ * @return
+ * @throws HiveException
+ */
+ public void loadPartition(Path loadPath, String tableName,
Map<String, String> partSpec, boolean replace, boolean inheritTableSpecs,
boolean isSkewedStoreAsSubdir, boolean isSrcLocal, boolean isAcid,
boolean hasFollowingStatsTask, Long mmWriteId, boolean isCommitMmWrite)
@@ -1594,7 +1637,6 @@ public class Hive {
}
}
-
public void commitMmTableWrite(Table tbl, Long mmWriteId)
throws HiveException {
try {
@@ -1623,7 +1665,11 @@ public class Hive {
* location/inputformat/outputformat/serde details from table spec
* @param isSrcLocal
* If the source directory is LOCAL
- * @param isAcid true if this is an ACID operation
+ * @param isAcid
+ * true if this is an ACID operation
+ * @param hasFollowingStatsTask
+ * true if there is a following task which updates the stats, so, this method need not update.
+ * @return Partition object being loaded with data
*/
public Partition loadPartition(Path loadPath, Table tbl, Map<String, String> partSpec,
boolean replace, boolean inheritTableSpecs, boolean isSkewedStoreAsSubdir,
@@ -1631,6 +1677,7 @@ public class Hive {
throws HiveException {
Path tblDataLocationPath = tbl.getDataLocation();
try {
+ // Get the partition object if it already exists
Partition oldPart = getPartition(tbl, partSpec, false);
/**
* Move files before creating the partition since down stream processes
@@ -1668,6 +1715,12 @@ public class Hive {
List<Path> newFiles = null;
PerfLogger perfLogger = SessionState.getPerfLogger();
perfLogger.PerfLogBegin("MoveTask", "FileMoves");
+ // If config is set, table is not temporary and partition being inserted exists, capture
+ // the list of files added. For not yet existing partitions (insert overwrite to new partition
+ // or dynamic partition inserts), the add partition event will capture the list of files added.
+ if (conf.getBoolVar(ConfVars.FIRE_EVENTS_FOR_DML) && !tbl.isTemporary() && (null != oldPart)) {
+ newFiles = Collections.synchronizedList(new ArrayList<Path>());
+ }
// TODO: this assumes both paths are qualified; which they are, currently.
if (mmWriteId != null && loadPath.equals(newPartPath)) {
// MM insert query, move itself is a no-op.
@@ -1678,7 +1731,8 @@ public class Hive {
}
Utilities.LOG14535.info("maybe deleting stuff from " + oldPartPath + " (new " + newPartPath + ") for replace");
if (replace && oldPartPath != null) {
- deleteOldPathForReplace(newPartPath, oldPartPath, getConf(),
+ boolean isAutoPurge = "true".equalsIgnoreCase(tbl.getProperty("auto.purge"));
+ deleteOldPathForReplace(newPartPath, oldPartPath, getConf(), isAutoPurge,
new ValidWriteIds.IdPathFilter(mmWriteId, false, true), mmWriteId != null,
tbl.isStoredAsSubDirectories() ? tbl.getSkewedColNames().size() : 0);
}
@@ -1693,13 +1747,10 @@ public class Hive {
}
Utilities.LOG14535.info("moving " + loadPath + " to " + destPath);
if (replace || (oldPart == null && !isAcid)) {
+ boolean isAutoPurge = "true".equalsIgnoreCase(tbl.getProperty("auto.purge"));
replaceFiles(tbl.getPath(), loadPath, destPath, oldPartPath, getConf(),
- isSrcLocal, filter, mmWriteId != null);
+ isSrcLocal, isAutoPurge, newFiles, filter, mmWriteId != null);
} else {
- if (areEventsForDmlNeeded(tbl, oldPart)) {
- newFiles = Collections.synchronizedList(new ArrayList<Path>());
- }
-
FileSystem fs = tbl.getDataLocation().getFileSystem(conf);
Hive.copyFiles(conf, loadPath, destPath, fs, isSrcLocal, isAcid, newFiles);
}
@@ -1708,13 +1759,17 @@ public class Hive {
Partition newTPart = oldPart != null ? oldPart : new Partition(tbl, partSpec, newPartPath);
alterPartitionSpecInMemory(tbl, partSpec, newTPart.getTPartition(), inheritTableSpecs, newPartPath.toString());
validatePartition(newTPart);
- if ((null != newFiles) || replace) {
- fireInsertEvent(tbl, partSpec, newFiles);
+
+ // Generate an insert event only if inserting into an existing partition
+ // When inserting into a new partition, the add partition event takes care of insert event
+ if ((null != oldPart) && (null != newFiles)) {
+ fireInsertEvent(tbl, partSpec, replace, newFiles);
} else {
- LOG.debug("No new files were created, and is not a replace. Skipping generating INSERT event.");
+ LOG.debug("No new files were created, and is not a replace, or we're inserting into a "
+ + "partition that does not exist yet. Skipping generating INSERT event.");
}
- //column stats will be inaccurate
+ // column stats will be inaccurate
StatsSetupConst.clearColumnStatsState(newTPart.getParameters());
// recreate the partition if it existed before
@@ -2158,7 +2213,8 @@ private void constructOneLBLocationMap(FileStatus fSta,
Utilities.LOG14535.info("not moving " + loadPath + " to " + tbl.getPath());
if (replace) {
Path tableDest = tbl.getPath();
- deleteOldPathForReplace(tableDest, tableDest, sessionConf,
+ boolean isAutopurge = "true".equalsIgnoreCase(tbl.getProperty("auto.purge"));
+ deleteOldPathForReplace(tableDest, tableDest, sessionConf, isAutopurge,
new ValidWriteIds.IdPathFilter(mmWriteId, false, true), mmWriteId != null,
tbl.isStoredAsSubDirectories() ? tbl.getSkewedColNames().size() : 0);
}
@@ -2174,8 +2230,9 @@ private void constructOneLBLocationMap(FileStatus fSta,
}
Utilities.LOG14535.info("moving " + loadPath + " to " + tblPath + " (replace = " + replace + ")");
if (replace) {
+ boolean isAutopurge = "true".equalsIgnoreCase(tbl.getProperty("auto.purge"));
replaceFiles(tblPath, loadPath, destPath, tblPath,
- sessionConf, isSrcLocal, filter, mmWriteId != null);
+ sessionConf, isSrcLocal, isAutopurge, newFiles, filter, mmWriteId != null);
} else {
try {
FileSystem fs = tbl.getDataLocation().getFileSystem(sessionConf);
@@ -2221,7 +2278,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
commitMmTableWrite(tbl, mmWriteId);
}
- fireInsertEvent(tbl, null, newFiles);
+ fireInsertEvent(tbl, null, replace, newFiles);
}
/**
@@ -2446,7 +2503,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
}
else {
alterPartitionSpec(tbl, partSpec, tpart, inheritTableSpecs, partPath);
- fireInsertEvent(tbl, partSpec, newFiles);
+ fireInsertEvent(tbl, partSpec, true, newFiles);
}
}
if (tpart == null) {
@@ -2496,7 +2553,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
tpart.getSd().setLocation(partPath);
}
- private void fireInsertEvent(Table tbl, Map<String, String> partitionSpec, List<Path> newFiles)
+ private void fireInsertEvent(Table tbl, Map<String, String> partitionSpec, boolean replace, List<Path> newFiles)
throws HiveException {
if (conf.getBoolVar(ConfVars.FIRE_EVENTS_FOR_DML)) {
LOG.debug("Firing dml insert event");
@@ -2508,6 +2565,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
FileSystem fileSystem = tbl.getDataLocation().getFileSystem(conf);
FireEventRequestData data = new FireEventRequestData();
InsertEventRequestData insertData = new InsertEventRequestData();
+ insertData.setReplace(replace);
data.setInsertData(insertData);
if (newFiles != null && newFiles.size() > 0) {
for (Path p : newFiles) {
@@ -3053,8 +3111,6 @@ private void constructOneLBLocationMap(FileStatus fSta,
if (!fullDestStatus.getFileStatus().isDirectory()) {
throw new HiveException(destf + " is not a directory.");
}
- final boolean inheritPerms = HiveConf.getBoolVar(conf,
- HiveConf.ConfVars.HIVE_WAREHOUSE_SUBDIR_INHERIT_PERMS);
final List<Future<ObjectPair<Path, Path>>> futures = new LinkedList<>();
final ExecutorService pool = conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25) > 0 ?
Executors.newFixedThreadPool(conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25),
@@ -3081,15 +3137,10 @@ private void constructOneLBLocationMap(FileStatus fSta,
// If we do a rename for a non-local file, we will be transfering the original
// file permissions from source to the destination. Else, in case of mvFile() where we
// copy from source to destination, we will inherit the destination's parent group ownership.
- final String srcGroup = isRenameAllowed ? srcFile.getGroup() :
- fullDestStatus.getFileStatus().getGroup();
if (null == pool) {
try {
Path destPath = mvFile(conf, srcFs, srcP, destFs, destf, isSrcLocal, isRenameAllowed);
- if (inheritPerms) {
- HdfsUtils.setFullFileStatus(conf, fullDestStatus, srcGroup, destFs, destPath, false);
- }
if (null != newFiles) {
newFiles.add(destPath);
}
@@ -3104,9 +3155,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
SessionState.setCurrentSessionState(parentSession);
Path destPath = mvFile(conf, srcFs, srcP, destFs, destf, isSrcLocal, isRenameAllowed);
- if (inheritPerms) {
- HdfsUtils.setFullFileStatus(conf, fullDestStatus, srcGroup, destFs, destPath, false);
- }
+
if (null != newFiles) {
newFiles.add(destPath);
}
@@ -3116,11 +3165,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
}
}
}
- if (null == pool) {
- if (inheritPerms) {
- HdfsUtils.setFullFileStatus(conf, fullDestStatus, null, destFs, destf, true);
- }
- } else {
+ if (null != pool) {
pool.shutdown();
for (Future<ObjectPair<Path, Path>> future : futures) {
try {
@@ -3178,11 +3223,34 @@ private void constructOneLBLocationMap(FileStatus fSta,
return ShimLoader.getHadoopShims().getPathWithoutSchemeAndAuthority(path);
}
+ /**
+ * <p>
+ * Moves a file from one {@link Path} to another. If {@code isRenameAllowed} is true then the
+ * {@link FileSystem#rename(Path, Path)} method is used to move the file. If its false then the data is copied, if
+ * {@code isSrcLocal} is true then the {@link FileSystem#copyFromLocalFile(Path, Path)} method is used, else
+ * {@link FileUtils#copy(FileSystem, Path, FileSystem, Path, boolean, boolean, HiveConf)} is used.
+ * </p>
+ *
+ * <p>
+ * If the destination file already exists, then {@code _copy_[counter]} is appended to the file name, where counter
+ * is an integer starting from 1.
+ * </p>
+ *
+ * @param conf the {@link HiveConf} to use if copying data
+ * @param sourceFs the {@link FileSystem} where the source file exists
+ * @param sourcePath the {@link Path} to move
+ * @param destFs the {@link FileSystem} to move the file to
+ * @param destDirPath the {@link Path} to move the file to
+ * @param isSrcLocal if the source file is on the local filesystem
+ * @param isRenameAllowed true if the data should be renamed and not copied, false otherwise
+ *
+ * @return the {@link Path} the source file was moved to
+ *
+ * @throws IOException if there was an issue moving the file
+ */
private static Path mvFile(HiveConf conf, FileSystem sourceFs, Path sourcePath, FileSystem destFs, Path destDirPath,
boolean isSrcLocal, boolean isRenameAllowed) throws IOException {
- boolean isBlobStoragePath = BlobStorageUtils.isBlobStoragePath(conf, destDirPath);
-
// Strip off the file type, if any so we don't make:
// 000000_0.gz -> 000000_0.gz_copy_1
final String fullname = sourcePath.getName();
@@ -3192,27 +3260,19 @@ private void constructOneLBLocationMap(FileStatus fSta,
Path destFilePath = new Path(destDirPath, fullname);
/*
- * The below loop may perform bad when the destination file already exists and it has too many _copy_
- * files as well. A desired approach was to call listFiles() and get a complete list of files from
- * the destination, and check whether the file exists or not on that list. However, millions of files
- * could live on the destination directory, and on concurrent situations, this can cause OOM problems.
- *
- * I'll leave the below loop for now until a better approach is found.
- */
-
- int counter = 1;
- if (!isRenameAllowed || isBlobStoragePath) {
- while (destFs.exists(destFilePath)) {
- destFilePath = new Path(destDirPath, name + ("_copy_" + counter) + (!type.isEmpty() ? "." + type : ""));
- counter++;
- }
+ * The below loop may perform bad when the destination file already exists and it has too many _copy_
+ * files as well. A desired approach was to call listFiles() and get a complete list of files from
+ * the destination, and check whether the file exists or not on that list. However, millions of files
+ * could live on the destination directory, and on concurrent situations, this can cause OOM problems.
+ *
+ * I'll leave the below loop for now until a better approach is found.
+ */
+ for (int counter = 1; destFs.exists(destFilePath); counter++) {
+ destFilePath = new Path(destDirPath, name + ("_copy_" + counter) + (!type.isEmpty() ? "." + type : ""));
}
if (isRenameAllowed) {
- while (!destFs.rename(sourcePath, destFilePath)) {
- destFilePath = new Path(destDirPath, name + ("_copy_" + counter) + (!type.isEmpty() ? "." + type : ""));
- counter++;
- }
+ destFs.rename(sourcePath, destFilePath);
} else if (isSrcLocal) {
destFs.copyFromLocalFile(sourcePath, destFilePath);
} else {
@@ -3221,7 +3281,6 @@ private void constructOneLBLocationMap(FileStatus fSta,
false, // overwrite destination
conf);
}
-
return destFilePath;
}
@@ -3250,12 +3309,30 @@ private void constructOneLBLocationMap(FileStatus fSta,
}
}
+ // List the new files in destination path which gets copied from source.
+ public static void listNewFilesRecursively(final FileSystem destFs, Path dest,
+ List<Path> newFiles) throws HiveException {
+ try {
+ for (FileStatus fileStatus : destFs.listStatus(dest, FileUtils.HIDDEN_FILES_PATH_FILTER)) {
+ if (fileStatus.isDirectory()) {
+ // If it is a sub-directory, then recursively list the files.
+ listNewFilesRecursively(destFs, fileStatus.getPath(), newFiles);
+ } else {
+ newFiles.add(fileStatus.getPath());
+ }
+ }
+ } catch (IOException e) {
+ LOG.error("Failed to get source file statuses", e);
+ throw new HiveException(e.getMessage(), e);
+ }
+ }
+
//it is assumed that parent directory of the destf should already exist when this
//method is called. when the replace value is true, this method works a little different
//from mv command if the destf is a directory, it replaces the destf instead of moving under
//the destf. in this case, the replaced destf still preserves the original destf's permission
- public static boolean moveFile(final HiveConf conf, Path srcf, final Path destf,
- boolean replace, boolean isSrcLocal) throws HiveException {
+ public static boolean moveFile(final HiveConf conf, Path srcf, final Path destf, boolean replace,
+ boolean isSrcLocal) throws HiveException {
final FileSystem srcFs, destFs;
try {
destFs = destf.getFileSystem(conf);
@@ -3266,13 +3343,10 @@ private void constructOneLBLocationMap(FileStatus fSta,
try {
srcFs = srcf.getFileSystem(conf);
} catch (IOException e) {
- LOG.error("Failed to get dest fs", e);
+ LOG.error("Failed to get src fs", e);
throw new HiveException(e.getMessage(), e);
}
- //needed for perm inheritance.
- final boolean inheritPerms = HiveConf.getBoolVar(conf,
- HiveConf.ConfVars.HIVE_WAREHOUSE_SUBDIR_INHERIT_PERMS);
HdfsUtils.HadoopFileStatus destStatus = null;
// If source path is a subdirectory of the destination path (or the other way around):
@@ -3284,7 +3358,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
boolean srcIsSubDirOfDest = isSubDir(srcf, destf, srcFs, destFs, isSrcLocal),
destIsSubDirOfSrc = isSubDir(destf, srcf, destFs, srcFs, false);
try {
- if (inheritPerms || replace) {
+ if (replace) {
try{
destStatus = new HdfsUtils.HadoopFileStatus(conf, destFs, destf);
//if destf is an existing directory:
@@ -3297,10 +3371,6 @@ private void constructOneLBLocationMap(FileStatus fSta,
LOG.debug("The path " + destf.toString() + " is deleted");
}
} catch (FileNotFoundException ignore) {
- //if dest dir does not exist, any re
- if (inheritPerms) {
- destStatus = new HdfsUtils.HadoopFileStatus(conf, destFs, destf.getParent());
- }
}
}
final HdfsUtils.HadoopFileStatus desiredStatus = destStatus;
@@ -3308,9 +3378,6 @@ private void constructOneLBLocationMap(FileStatus fSta,
if (isSrcLocal) {
// For local src file, copy to hdfs
destFs.copyFromLocalFile(srcf, destf);
- if (inheritPerms) {
- HdfsUtils.setFullFileStatus(conf, destStatus, destFs, destf, true);
- }
return true;
} else {
if (needToCopy(srcf, destf, srcFs, destFs)) {
@@ -3347,11 +3414,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
public Void call() throws Exception {
SessionState.setCurrentSessionState(parentSession);
final String group = srcStatus.getGroup();
- if(destFs.rename(srcStatus.getPath(), destFile)) {
- if (inheritPerms) {
- HdfsUtils.setFullFileStatus(conf, desiredStatus, group, destFs, destFile, false);
- }
- } else {
+ if(!destFs.rename(srcStatus.getPath(), destFile)) {
throw new IOException("rename for src path: " + srcStatus.getPath() + " to dest path:"
+ destFile + " returned false");
}
@@ -3360,11 +3423,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
}));
}
}
- if (null == pool) {
- if (inheritPerms) {
- HdfsUtils.setFullFileStatus(conf, desiredStatus, null, destFs, destf, true);
- }
- } else {
+ if (null != pool) {
pool.shutdown();
for (Future<Void> future : futures) {
try {
@@ -3379,9 +3438,6 @@ private void constructOneLBLocationMap(FileStatus fSta,
return true;
} else {
if (destFs.rename(srcf, destf)) {
- if (inheritPerms) {
- HdfsUtils.setFullFileStatus(conf, destStatus, destFs, destf, true);
- }
return true;
}
return false;
@@ -3432,12 +3488,10 @@ private void constructOneLBLocationMap(FileStatus fSta,
*/
static protected void copyFiles(HiveConf conf, Path srcf, Path destf,
FileSystem fs, boolean isSrcLocal, boolean isAcid, List<Path> newFiles) throws HiveException {
- boolean inheritPerms = HiveConf.getBoolVar(conf,
- HiveConf.ConfVars.HIVE_WAREHOUSE_SUBDIR_INHERIT_PERMS);
try {
// create the destination if it does not exist
if (!fs.exists(destf)) {
- FileUtils.mkdir(fs, destf, inheritPerms, conf);
+ FileUtils.mkdir(fs, destf, conf);
}
} catch (IOException e) {
throw new HiveException(
@@ -3571,11 +3625,16 @@ private void constructOneLBLocationMap(FileStatus fSta,
* @param oldPath
* The directory where the old data location, need to be cleaned up. Most of time, will be the same
* as destf, unless its across FileSystem boundaries.
+ * @param purge
+ * When set to true files which needs to be deleted are not moved to Trash
* @param isSrcLocal
* If the source directory is LOCAL
+ * @param newFiles
+ * Output the list of new files replaced in the destination path
*/
protected void replaceFiles(Path tablePath, Path srcf, Path destf, Path oldPath, HiveConf conf,
- boolean isSrcLocal, PathFilter deletePathFilter, boolean isMmTable) throws HiveException {
+ boolean isSrcLocal, boolean purge, List<Path> newFiles, PathFilter deletePathFilter,
+ boolean isMmTable) throws HiveException {
try {
FileSystem destFs = destf.getFileSystem(conf);
@@ -3596,14 +3655,12 @@ private void constructOneLBLocationMap(FileStatus fSta,
if (oldPath != null) {
// TODO: we assume lbLevels is 0 here. Same as old code for non-MM.
// For MM tables, this can only be a LOAD command. Does LOAD even support LB?
- deleteOldPathForReplace(destf, oldPath, conf, deletePathFilter, isMmTable, 0);
+ deleteOldPathForReplace(destf, oldPath, conf, purge, deletePathFilter, isMmTable, 0);
}
// first call FileUtils.mkdir to make sure that destf directory exists, if not, it creates
- // destf with inherited permissions
- boolean inheritPerms = HiveConf.getBoolVar(conf, HiveConf.ConfVars
- .HIVE_WAREHOUSE_SUBDIR_INHERIT_PERMS);
- boolean destfExist = FileUtils.mkdir(destFs, destf, inheritPerms, conf);
+ // destf
+ boolean destfExist = FileUtils.mkdir(destFs, destf, conf);
if(!destfExist) {
throw new IOException("Directory " + destf.toString()
+ " does not exist and could not be created.");
@@ -3619,11 +3676,23 @@ private void constructOneLBLocationMap(FileStatus fSta,
if (!moveFile(conf, srcs[0].getPath(), destf, true, isSrcLocal)) {
throw new IOException("Error moving: " + srcf + " into: " + destf);
}
- } else { // its either a file or glob
+
+ // Add file paths of the files that will be moved to the destination if the caller needs it
+ if (null != newFiles) {
+ listNewFilesRecursively(destFs, destf, newFiles);
+ }
+ } else {
+ // its either a file or glob
for (FileStatus src : srcs) {
- if (!moveFile(conf, src.getPath(), new Path(destf, src.getPath().getName()), true, isSrcLocal)) {
+ Path destFile = new Path(destf, src.getPath().getName());
+ if (!moveFile(conf, src.getPath(), destFile, true, isSrcLocal)) {
throw new IOException("Error moving: " + srcf + " into: " + destf);
}
+
+ // Add file paths of the files that will be moved to the destination if the caller needs it
+ if (null != newFiles) {
+ newFiles.add(destFile);
+ }
}
}
} catch (IOException e) {
@@ -3631,7 +3700,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
}
}
- private void deleteOldPathForReplace(Path destPath, Path oldPath, HiveConf conf,
+ private void deleteOldPathForReplace(Path destPath, Path oldPath, HiveConf conf, boolean purge,
PathFilter pathFilter, boolean isMmTable, int lbLevels) throws HiveException {
Utilities.LOG14535.info("Deleting old paths for replace in " + destPath + " and old path " + oldPath);
boolean isOldPathUnderDestf = false;
@@ -3645,7 +3714,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
isOldPathUnderDestf = isSubDir(oldPath, destPath, oldFs, destFs, false);
if (isOldPathUnderDestf || isMmTable) {
if (lbLevels == 0 || !isMmTable) {
- cleanUpOneDirectoryForReplace(oldPath, oldFs, pathFilter, conf);
+ cleanUpOneDirectoryForReplace(oldPath, oldFs, pathFilter, conf, purge);
} else {
// We need to clean up different MM IDs from each LB directory separately.
// Avoid temporary directories in the immediate table/part dir.
@@ -3663,7 +3732,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
throw new HiveException("Unexpected path during overwrite: " + lbPath);
}
Utilities.LOG14535.info("Cleaning up LB directory " + lbPath);
- cleanUpOneDirectoryForReplace(lbPath, oldFs, pathFilter, conf);
+ cleanUpOneDirectoryForReplace(lbPath, oldFs, pathFilter, conf, purge);
}
}
}
@@ -3681,7 +3750,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
private void cleanUpOneDirectoryForReplace(Path path, FileSystem fs,
- PathFilter pathFilter, HiveConf conf) throws IOException, HiveException {
+ PathFilter pathFilter, HiveConf conf, boolean purge) throws IOException, HiveException {
FileStatus[] statuses = fs.listStatus(path, pathFilter);
if (statuses == null || statuses.length == 0) return;
String s = "Deleting files under " + path + " for replace: ";
@@ -3689,7 +3758,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
s += file.getPath().getName() + ", ";
}
Utilities.LOG14535.info(s);
- if (!trashFiles(fs, statuses, conf)) {
+ if (!trashFiles(fs, statuses, conf, purge)) {
throw new HiveException("Old path " + path + " has not been cleaned up.");
}
}
@@ -3703,7 +3772,8 @@ private void constructOneLBLocationMap(FileStatus fSta,
* @return true if deletion successful
* @throws IOException
*/
- public static boolean trashFiles(final FileSystem fs, final FileStatus[] statuses, final Configuration conf)
+ public static boolean trashFiles(final FileSystem fs, final FileStatus[] statuses,
+ final Configuration conf, final boolean purge)
throws IOException {
boolean result = true;
@@ -3717,13 +3787,13 @@ private void constructOneLBLocationMap(FileStatus fSta,
final SessionState parentSession = SessionState.get();
for (final FileStatus status : statuses) {
if (null == pool) {
- result &= FileUtils.moveToTrash(fs, status.getPath(), conf);
+ result &= FileUtils.moveToTrash(fs, status.getPath(), conf, purge);
} else {
futures.add(pool.submit(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
SessionState.setCurrentSessionState(parentSession);
- return FileUtils.moveToTrash(fs, status.getPath(), conf);
+ return FileUtils.moveToTrash(fs, status.getPath(), conf, purge);
}
}));
}
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMaterializedViewsRegistry.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMaterializedViewsRegistry.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMaterializedViewsRegistry.java
index 1d78b4c..b121eea 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMaterializedViewsRegistry.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMaterializedViewsRegistry.java
@@ -34,6 +34,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.calcite.adapter.druid.DruidQuery;
import org.apache.calcite.adapter.druid.DruidSchema;
import org.apache.calcite.adapter.druid.DruidTable;
+import org.apache.calcite.adapter.druid.LocalInterval;
import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptMaterialization;
@@ -310,7 +311,7 @@ public final class HiveMaterializedViewsRegistry {
}
metrics.add(field.getName());
}
- List<Interval> intervals = Arrays.asList(DruidTable.DEFAULT_INTERVAL);
+ List<LocalInterval> intervals = Arrays.asList(DruidTable.DEFAULT_INTERVAL);
DruidTable druidTable = new DruidTable(new DruidSchema(address, address, false),
dataSource, RelDataTypeImpl.proto(rowType), metrics, DruidTable.DEFAULT_TIMESTAMP_COLUMN, intervals);
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMetaStoreChecker.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMetaStoreChecker.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMetaStoreChecker.java
index 6805c17..4add836 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMetaStoreChecker.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMetaStoreChecker.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.metadata;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
@@ -35,7 +36,10 @@ import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import com.google.common.collect.Sets;
+import org.apache.hadoop.hive.common.StringInternUtils;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.ql.log.PerfLogger;
+import org.apache.hadoop.hive.ql.session.SessionState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.FileStatus;
@@ -49,8 +53,6 @@ import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.ql.metadata.CheckResult.PartitionResult;
-import org.apache.hadoop.hive.ql.optimizer.ppr.PartitionPruner;
-import org.apache.hadoop.hive.ql.parse.PrunedPartitionList;
import org.apache.thrift.TException;
import com.google.common.util.concurrent.MoreExecutors;
@@ -64,6 +66,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
public class HiveMetaStoreChecker {
public static final Logger LOG = LoggerFactory.getLogger(HiveMetaStoreChecker.class);
+ public static final String CLASS_NAME = HiveMetaStoreChecker.class.getName();
private final Hive hive;
private final HiveConf conf;
@@ -208,19 +211,28 @@ public class HiveMetaStoreChecker {
return;
}
- List<Partition> parts = new ArrayList<Partition>();
+ PartitionIterable parts;
boolean findUnknownPartitions = true;
if (table.isPartitioned()) {
if (partitions == null || partitions.isEmpty()) {
- PrunedPartitionList prunedPartList =
- PartitionPruner.prune(table, null, conf, toString(), null);
- // no partitions specified, let's get all
- parts.addAll(prunedPartList.getPartitions());
+ String mode = HiveConf.getVar(conf, ConfVars.HIVEMAPREDMODE, (String) null);
+ if ("strict".equalsIgnoreCase(mode)) {
+ parts = new PartitionIterable(hive, table, null, conf.getIntVar(
+ HiveConf.ConfVars.METASTORE_BATCH_RETRIEVE_MAX));
+ } else {
+ List<Partition> loadedPartitions = new ArrayList<>();
+ PerfLogger perfLogger = SessionState.getPerfLogger();
+ perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.PARTITION_RETRIEVING);
+ loadedPartitions.addAll(hive.getAllPartitionsOf(table));
+ perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.PARTITION_RETRIEVING);
+ parts = new PartitionIterable(loadedPartitions);
+ }
} else {
// we're interested in specific partitions,
// don't check for any others
findUnknownPartitions = false;
+ List<Partition> loadedPartitions = new ArrayList<>();
for (Map<String, String> map : partitions) {
Partition part = hive.getPartition(table, map, false);
if (part == null) {
@@ -229,10 +241,13 @@ public class HiveMetaStoreChecker {
pr.setPartitionName(Warehouse.makePartPath(map));
result.getPartitionsNotInMs().add(pr);
} else {
- parts.add(part);
+ loadedPartitions.add(part);
}
}
+ parts = new PartitionIterable(loadedPartitions);
}
+ } else {
+ parts = new PartitionIterable(Collections.<Partition>emptyList());
}
checkTable(table, parts, findUnknownPartitions, result);
@@ -255,7 +270,7 @@ public class HiveMetaStoreChecker {
* @throws HiveException
* Could not create Partition object
*/
- void checkTable(Table table, List<Partition> parts,
+ void checkTable(Table table, PartitionIterable parts,
boolean findUnknownPartitions, CheckResult result) throws IOException,
HiveException {
@@ -284,7 +299,9 @@ public class HiveMetaStoreChecker {
}
for (int i = 0; i < partition.getSpec().size(); i++) {
- partPaths.add(partPath.makeQualified(fs));
+ Path qualifiedPath = partPath.makeQualified(fs);
+ StringInternUtils.internUriStringsInPath(qualifiedPath);
+ partPaths.add(qualifiedPath);
partPath = partPath.getParent();
}
}
@@ -314,7 +331,7 @@ public class HiveMetaStoreChecker {
// now check the table folder and see if we find anything
// that isn't in the metastore
Set<Path> allPartDirs = new HashSet<Path>();
- checkPartitionDirs(tablePath, allPartDirs, table.getPartCols().size());
+ checkPartitionDirs(tablePath, allPartDirs, Collections.unmodifiableList(table.getPartColNames()));
// don't want the table dir
allPartDirs.remove(tablePath);
@@ -398,14 +415,14 @@ public class HiveMetaStoreChecker {
* Start directory
* @param allDirs
* This set will contain the leaf paths at the end.
- * @param maxDepth
+ * @param list
* Specify how deep the search goes.
* @throws IOException
* Thrown if we can't get lists from the fs.
* @throws HiveException
*/
- private void checkPartitionDirs(Path basePath, Set<Path> allDirs, int maxDepth) throws IOException, HiveException {
+ private void checkPartitionDirs(Path basePath, Set<Path> allDirs, final List<String> partColNames) throws IOException, HiveException {
// Here we just reuse the THREAD_COUNT configuration for
// METASTORE_FS_HANDLER_THREADS_COUNT since this results in better performance
// The number of missing partitions discovered are later added by metastore using a
@@ -423,21 +440,21 @@ public class HiveMetaStoreChecker {
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("MSCK-GetPaths-%d").build();
executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(poolSize, threadFactory);
}
- checkPartitionDirs(executor, basePath, allDirs, basePath.getFileSystem(conf), maxDepth);
+ checkPartitionDirs(executor, basePath, allDirs, basePath.getFileSystem(conf), partColNames);
executor.shutdown();
}
private final class PathDepthInfoCallable implements Callable<Path> {
- private final int maxDepth;
+ private final List<String> partColNames;
private final FileSystem fs;
private final ConcurrentLinkedQueue<PathDepthInfo> pendingPaths;
private final boolean throwException;
private final PathDepthInfo pd;
- private PathDepthInfoCallable(PathDepthInfo pd, int maxDepth, FileSystem fs,
+ private PathDepthInfoCallable(PathDepthInfo pd, List<String> partColNames, FileSystem fs,
ConcurrentLinkedQueue<PathDepthInfo> basePaths) {
- this.maxDepth = maxDepth;
+ this.partColNames = partColNames;
this.pd = pd;
this.fs = fs;
this.pendingPaths = basePaths;
@@ -457,39 +474,50 @@ public class HiveMetaStoreChecker {
FileStatus[] fileStatuses = fs.listStatus(currentPath, FileUtils.HIDDEN_FILES_PATH_FILTER);
// found no files under a sub-directory under table base path; it is possible that the table
// is empty and hence there are no partition sub-directories created under base path
- if (fileStatuses.length == 0 && currentDepth > 0 && currentDepth < maxDepth) {
+ if (fileStatuses.length == 0 && currentDepth > 0 && currentDepth < partColNames.size()) {
// since maxDepth is not yet reached, we are missing partition
// columns in currentPath
- if (throwException) {
- throw new HiveException(
- "MSCK is missing partition columns under " + currentPath.toString());
- } else {
- LOG.warn("MSCK is missing partition columns under " + currentPath.toString());
- }
+ logOrThrowExceptionWithMsg(
+ "MSCK is missing partition columns under " + currentPath.toString());
} else {
// found files under currentPath add them to the queue if it is a directory
for (FileStatus fileStatus : fileStatuses) {
- if (!fileStatus.isDirectory() && currentDepth < maxDepth) {
+ if (!fileStatus.isDirectory() && currentDepth < partColNames.size()) {
// found a file at depth which is less than number of partition keys
- if (throwException) {
- throw new HiveException(
- "MSCK finds a file rather than a directory when it searches for "
- + fileStatus.getPath().toString());
+ logOrThrowExceptionWithMsg(
+ "MSCK finds a file rather than a directory when it searches for "
+ + fileStatus.getPath().toString());
+ } else if (fileStatus.isDirectory() && currentDepth < partColNames.size()) {
+ // found a sub-directory at a depth less than number of partition keys
+ // validate if the partition directory name matches with the corresponding
+ // partition colName at currentDepth
+ Path nextPath = fileStatus.getPath();
+ String[] parts = nextPath.getName().split("=");
+ if (parts.length != 2) {
+ logOrThrowExceptionWithMsg("Invalid partition name " + nextPath);
+ } else if (!parts[0].equalsIgnoreCase(partColNames.get(currentDepth))) {
+ logOrThrowExceptionWithMsg(
+ "Unexpected partition key " + parts[0] + " found at " + nextPath);
} else {
- LOG.warn("MSCK finds a file rather than a directory when it searches for "
- + fileStatus.getPath().toString());
+ // add sub-directory to the work queue if maxDepth is not yet reached
+ pendingPaths.add(new PathDepthInfo(nextPath, currentDepth + 1));
}
- } else if (fileStatus.isDirectory() && currentDepth < maxDepth) {
- // add sub-directory to the work queue if maxDepth is not yet reached
- pendingPaths.add(new PathDepthInfo(fileStatus.getPath(), currentDepth + 1));
}
}
- if (currentDepth == maxDepth) {
+ if (currentDepth == partColNames.size()) {
return currentPath;
}
}
return null;
}
+
+ private void logOrThrowExceptionWithMsg(String msg) throws HiveException {
+ if(throwException) {
+ throw new HiveException(msg);
+ } else {
+ LOG.warn(msg);
+ }
+ }
}
private static class PathDepthInfo {
@@ -503,7 +531,7 @@ public class HiveMetaStoreChecker {
private void checkPartitionDirs(final ExecutorService executor,
final Path basePath, final Set<Path> result,
- final FileSystem fs, final int maxDepth) throws HiveException {
+ final FileSystem fs, final List<String> partColNames) throws HiveException {
try {
Queue<Future<Path>> futures = new LinkedList<Future<Path>>();
ConcurrentLinkedQueue<PathDepthInfo> nextLevel = new ConcurrentLinkedQueue<>();
@@ -520,7 +548,7 @@ public class HiveMetaStoreChecker {
//process each level in parallel
while(!nextLevel.isEmpty()) {
futures.add(
- executor.submit(new PathDepthInfoCallable(nextLevel.poll(), maxDepth, fs, tempQueue)));
+ executor.submit(new PathDepthInfoCallable(nextLevel.poll(), partColNames, fs, tempQueue)));
}
while(!futures.isEmpty()) {
Path p = futures.poll().get();
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java
index 8eb011e..a319b88 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java
@@ -391,7 +391,7 @@ public class SessionHiveMetaStoreClient extends HiveMetaStoreClient implements I
throw new MetaException("Temp table path not set for " + tbl.getTableName());
} else {
if (!wh.isDir(tblPath)) {
- if (!wh.mkdirs(tblPath, true)) {
+ if (!wh.mkdirs(tblPath)) {
throw new MetaException(tblPath
+ " is not a directory or unable to create one");
}
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java
index c53ddad..5efaf70 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java
@@ -18,15 +18,19 @@
package org.apache.hadoop.hive.ql.metadata;
+import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.Set;
+import com.google.common.base.Preconditions;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
@@ -504,6 +508,14 @@ public class Table implements Serializable {
return null;
}
+ public List<String> getPartColNames() {
+ List<String> partColNames = new ArrayList<String>();
+ for (FieldSchema key : getPartCols()) {
+ partColNames.add(key.getName());
+ }
+ return partColNames;
+ }
+
public boolean isPartitionKey(String colName) {
return getPartColByName(colName) == null ? false : true;
}
@@ -936,6 +948,16 @@ public class Table implements Serializable {
}
}
+ public boolean isEmpty() throws HiveException {
+ Preconditions.checkNotNull(getPath());
+ try {
+ FileSystem fs = FileSystem.get(getPath().toUri(), SessionState.getSessionConf());
+ return !fs.exists(getPath()) || fs.listStatus(getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER).length == 0;
+ } catch (IOException e) {
+ throw new HiveException(e);
+ }
+ }
+
public boolean isTemporary() {
return tTable.isTemporary();
}
@@ -964,7 +986,7 @@ public class Table implements Serializable {
public static void validateColumns(List<FieldSchema> columns, List<FieldSchema> partCols)
throws HiveException {
- List<String> colNames = new ArrayList<String>();
+ Set<String> colNames = new HashSet<>();
for (FieldSchema partCol: columns) {
String colName = normalize(partCol.getName());
if (colNames.contains(colName)) {
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/metadata/VirtualColumn.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/VirtualColumn.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/VirtualColumn.java
index 044d64c..2435bf1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/VirtualColumn.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/VirtualColumn.java
@@ -60,7 +60,7 @@ public enum VirtualColumn {
*/
GROUPINGID("GROUPING__ID", TypeInfoFactory.intTypeInfo);
- public static ImmutableSet<String> VIRTUAL_COLUMN_NAMES =
+ public static final ImmutableSet<String> VIRTUAL_COLUMN_NAMES =
ImmutableSet.of(FILENAME.getName(), BLOCKOFFSET.getName(), ROWOFFSET.getName(),
RAWDATASIZE.getName(), GROUPINGID.getName(), ROWID.getName());
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPruner.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPruner.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPruner.java
index 7e39d77..d59603e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPruner.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPruner.java
@@ -158,14 +158,15 @@ public class ColumnPruner extends Transform {
boolean walkChildren = true;
opStack.push(nd);
- // no need to go further down for a select op with a file sink or script
- // child
- // since all cols are needed for these ops
+ // no need to go further down for a select op with all file sink or script
+ // child since all cols are needed for these ops
+ // However, if one of the children is not file sink or script, we still go down.
if (nd instanceof SelectOperator) {
+ walkChildren = false;
for (Node child : nd.getChildren()) {
- if ((child instanceof FileSinkOperator)
- || (child instanceof ScriptOperator)) {
- walkChildren = false;
+ if (!(child instanceof FileSinkOperator || child instanceof ScriptOperator)) {
+ walkChildren = true;
+ break;
}
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java
index 00ec03e..45839ad 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java
@@ -671,10 +671,11 @@ public final class ColumnPrunerProcFactory {
List<FieldNode> colsAfterReplacement = new ArrayList<>();
List<FieldNode> newCols = new ArrayList<>();
- for (FieldNode col : cols) {
- int index = outputCols.indexOf(col.getFieldName());
+ for (int index = 0; index < numSelColumns; index++) {
+ String colName = outputCols.get(index);
+ FieldNode col = lookupColumn(cols, colName);
// colExprMap.size() == size of cols from SEL(*) branch
- if (index >= 0 && index < numSelColumns) {
+ if (col != null) {
ExprNodeDesc transformed = colExprMap.get(col.getFieldName());
colsAfterReplacement = mergeFieldNodesWithDesc(colsAfterReplacement, transformed);
newCols.add(col);
@@ -713,12 +714,14 @@ public final class ColumnPrunerProcFactory {
RowSchema rs = op.getSchema();
ArrayList<ExprNodeDesc> colList = new ArrayList<>();
List<FieldNode> outputCols = new ArrayList<>();
- for (FieldNode col : cols) {
- // revert output cols of SEL(*) to ExprNodeColumnDesc
- ColumnInfo colInfo = rs.getColumnInfo(col.getFieldName());
- ExprNodeColumnDesc colExpr = new ExprNodeColumnDesc(colInfo);
- colList.add(colExpr);
- outputCols.add(col);
+ for (ColumnInfo colInfo : rs.getSignature()) {
+ FieldNode col = lookupColumn(cols, colInfo.getInternalName());
+ if (col != null) {
+ // revert output cols of SEL(*) to ExprNodeColumnDesc
+ ExprNodeColumnDesc colExpr = new ExprNodeColumnDesc(colInfo);
+ colList.add(colExpr);
+ outputCols.add(col);
+ }
}
// replace SEL(*) to SEL(exprs)
((SelectDesc)select.getConf()).setSelStarNoCompute(false);
@@ -810,11 +813,18 @@ public final class ColumnPrunerProcFactory {
ArrayList<String> newOutputColumnNames = new ArrayList<String>();
ArrayList<ColumnInfo> rs_oldsignature = op.getSchema().getSignature();
ArrayList<ColumnInfo> rs_newsignature = new ArrayList<ColumnInfo>();
+ // The pruning needs to preserve the order of columns in the input schema
+ Set<String> colNames = new HashSet<String>();
for (FieldNode col : cols) {
- int index = originalOutputColumnNames.indexOf(col.getFieldName());
- newOutputColumnNames.add(col.getFieldName());
- newColList.add(originalColList.get(index));
- rs_newsignature.add(rs_oldsignature.get(index));
+ colNames.add(col.getFieldName());
+ }
+ for (int i = 0; i < originalOutputColumnNames.size(); i++) {
+ String colName = originalOutputColumnNames.get(i);
+ if (colNames.contains(colName)) {
+ newOutputColumnNames.add(colName);
+ newColList.add(originalColList.get(i));
+ rs_newsignature.add(rs_oldsignature.get(i));
+ }
}
op.getSchema().setSignature(rs_newsignature);
conf.setColList(newColList);
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
index e68618a..d0fdb52 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
@@ -48,6 +48,7 @@ import org.apache.hadoop.hive.ql.exec.TezDummyStoreOperator;
import org.apache.hadoop.hive.ql.lib.Node;
import org.apache.hadoop.hive.ql.lib.NodeProcessor;
import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.optimizer.physical.LlapClusterStateForCompile;
import org.apache.hadoop.hive.ql.parse.GenTezUtils;
import org.apache.hadoop.hive.ql.parse.OptimizeTezProcContext;
import org.apache.hadoop.hive.ql.parse.ParseContext;
@@ -68,6 +69,8 @@ import org.apache.hadoop.util.ReflectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.annotations.VisibleForTesting;
+
/**
* ConvertJoinMapJoin is an optimization that replaces a common join
* (aka shuffle join) with a map join (aka broadcast or fragment replicate
@@ -95,15 +98,19 @@ public class ConvertJoinMapJoin implements NodeProcessor {
JoinOperator joinOp = (JoinOperator) nd;
long maxSize = context.conf.getLongVar(HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD);
+ // adjust noconditional task size threshold for LLAP
+ maxSize = getNoConditionalTaskSizeForLlap(maxSize, context.conf);
+ joinOp.getConf().setNoConditionalTaskSize(maxSize);
+
TezBucketJoinProcCtx tezBucketJoinProcCtx = new TezBucketJoinProcCtx(context.conf);
if (!context.conf.getBoolVar(HiveConf.ConfVars.HIVECONVERTJOIN)) {
// we are just converting to a common merge join operator. The shuffle
// join in map-reduce case.
- Object retval = checkAndConvertSMBJoin(context, joinOp, tezBucketJoinProcCtx);
+ Object retval = checkAndConvertSMBJoin(context, joinOp, tezBucketJoinProcCtx, maxSize);
if (retval == null) {
return retval;
} else {
- fallbackToReduceSideJoin(joinOp, context);
+ fallbackToReduceSideJoin(joinOp, context, maxSize);
return null;
}
}
@@ -120,13 +127,13 @@ public class ConvertJoinMapJoin implements NodeProcessor {
LOG.info("Estimated number of buckets " + numBuckets);
int mapJoinConversionPos = getMapJoinConversionPos(joinOp, context, numBuckets, false, maxSize, true);
if (mapJoinConversionPos < 0) {
- Object retval = checkAndConvertSMBJoin(context, joinOp, tezBucketJoinProcCtx);
+ Object retval = checkAndConvertSMBJoin(context, joinOp, tezBucketJoinProcCtx, maxSize);
if (retval == null) {
return retval;
} else {
// only case is full outer join with SMB enabled which is not possible. Convert to regular
// join.
- fallbackToReduceSideJoin(joinOp, context);
+ fallbackToReduceSideJoin(joinOp, context, maxSize);
return null;
}
}
@@ -147,7 +154,7 @@ public class ConvertJoinMapJoin implements NodeProcessor {
if (mapJoinConversionPos < 0) {
// we are just converting to a common merge join operator. The shuffle
// join in map-reduce case.
- fallbackToReduceSideJoin(joinOp, context);
+ fallbackToReduceSideJoin(joinOp, context, maxSize);
return null;
}
@@ -164,15 +171,54 @@ public class ConvertJoinMapJoin implements NodeProcessor {
return null;
}
+ @VisibleForTesting
+ public long getNoConditionalTaskSizeForLlap(final long maxSize, final HiveConf conf) {
+ if ("llap".equalsIgnoreCase(conf.getVar(ConfVars.HIVE_EXECUTION_MODE))) {
+ LlapClusterStateForCompile llapInfo = LlapClusterStateForCompile.getClusterInfo(conf);
+ llapInfo.initClusterInfo();
+ final int executorsPerNode;
+ if (!llapInfo.hasClusterInfo()) {
+ LOG.warn("LLAP cluster information not available. Falling back to getting #executors from hiveconf..");
+ executorsPerNode = conf.getIntVar(ConfVars.LLAP_DAEMON_NUM_EXECUTORS);
+ } else {
+ final int numExecutorsPerNodeFromCluster = llapInfo.getNumExecutorsPerNode();
+ if (numExecutorsPerNodeFromCluster == -1) {
+ LOG.warn("Cannot determine executor count from LLAP cluster information. Falling back to getting #executors" +
+ " from hiveconf..");
+ executorsPerNode = conf.getIntVar(ConfVars.LLAP_DAEMON_NUM_EXECUTORS);
+ } else {
+ executorsPerNode = numExecutorsPerNodeFromCluster;
+ }
+ }
+ final int numSessions = conf.getIntVar(ConfVars.HIVE_SERVER2_TEZ_SESSIONS_PER_DEFAULT_QUEUE);
+ if (numSessions > 0) {
+ final int availableSlotsPerQuery = (int) ((double) executorsPerNode / numSessions);
+ final double overSubscriptionFactor = conf.getFloatVar(ConfVars.LLAP_MAPJOIN_MEMORY_OVERSUBSCRIBE_FACTOR);
+ final int maxSlotsPerQuery = conf.getIntVar(ConfVars.LLAP_MEMORY_OVERSUBSCRIPTION_MAX_EXECUTORS_PER_QUERY);
+ final int slotsPerQuery = Math.min(maxSlotsPerQuery, availableSlotsPerQuery);
+ final long llapMaxSize = (long) (maxSize + (maxSize * overSubscriptionFactor * slotsPerQuery));
+ LOG.info("No conditional task size adjusted for LLAP. executorsPerNode: {}, numSessions: {}, " +
+ "availableSlotsPerQuery: {}, overSubscriptionFactor: {}, maxSlotsPerQuery: {}, slotsPerQuery: {}, " +
+ "noconditionalTaskSize: {}, adjustedNoconditionalTaskSize: {}", executorsPerNode, numSessions,
+ availableSlotsPerQuery, overSubscriptionFactor, maxSlotsPerQuery, slotsPerQuery, maxSize, llapMaxSize);
+ return Math.max(maxSize, llapMaxSize);
+ } else {
+ LOG.warn(ConfVars.HIVE_SERVER2_TEZ_SESSIONS_PER_DEFAULT_QUEUE.varname + " returned value {}. Returning {}" +
+ " as no conditional task size for LLAP.", numSessions, maxSize);
+ }
+ }
+ return maxSize;
+ }
+
@SuppressWarnings("unchecked")
private Object checkAndConvertSMBJoin(OptimizeTezProcContext context, JoinOperator joinOp,
- TezBucketJoinProcCtx tezBucketJoinProcCtx) throws SemanticException {
+ TezBucketJoinProcCtx tezBucketJoinProcCtx, final long maxSize) throws SemanticException {
// we cannot convert to bucket map join, we cannot convert to
// map join either based on the size. Check if we can convert to SMB join.
if ((HiveConf.getBoolVar(context.conf, ConfVars.HIVE_AUTO_SORTMERGE_JOIN) == false)
|| ((!HiveConf.getBoolVar(context.conf, ConfVars.HIVE_AUTO_SORTMERGE_JOIN_REDUCE))
&& joinOp.getOpTraits().getNumReduceSinks() >= 2)) {
- fallbackToReduceSideJoin(joinOp, context);
+ fallbackToReduceSideJoin(joinOp, context, maxSize);
return null;
}
Class<? extends BigTableSelectorForAutoSMJ> bigTableMatcherClass = null;
@@ -201,7 +247,7 @@ public class ConvertJoinMapJoin implements NodeProcessor {
// contains aliases from sub-query
// we are just converting to a common merge join operator. The shuffle
// join in map-reduce case.
- fallbackToReduceSideJoin(joinOp, context);
+ fallbackToReduceSideJoin(joinOp, context, maxSize);
return null;
}
@@ -211,7 +257,7 @@ public class ConvertJoinMapJoin implements NodeProcessor {
} else {
// we are just converting to a common merge join operator. The shuffle
// join in map-reduce case.
- fallbackToReduceSideJoin(joinOp, context);
+ fallbackToReduceSideJoin(joinOp, context, maxSize);
}
return null;
}
@@ -235,7 +281,7 @@ public class ConvertJoinMapJoin implements NodeProcessor {
joinOp.getConf().getBaseSrc(), joinOp).getSecond(),
null, joinDesc.getExprs(), null, null,
joinDesc.getOutputColumnNames(), mapJoinConversionPos, joinDesc.getConds(),
- joinDesc.getFilters(), joinDesc.getNoOuterJoin(), null);
+ joinDesc.getFilters(), joinDesc.getNoOuterJoin(), null, joinDesc.getNoConditionalTaskSize());
mapJoinDesc.setNullSafes(joinDesc.getNullSafes());
mapJoinDesc.setFilterMap(joinDesc.getFilterMap());
mapJoinDesc.setResidualFilterExprs(joinDesc.getResidualFilterExprs());
@@ -794,7 +840,7 @@ public class ConvertJoinMapJoin implements NodeProcessor {
// The semijoin branch can potentially create a task level cycle
// with the hashjoin except when it is dynamically partitioned hash
// join which takes place in a separate task.
- if (context.parseContext.getRsOpToTsOpMap().size() > 0
+ if (context.parseContext.getRsToSemiJoinBranchInfo().size() > 0
&& removeReduceSink) {
removeCycleCreatingSemiJoinOps(mapJoinOp, parentSelectOpOfBigTableOp,
context.parseContext);
@@ -826,7 +872,7 @@ public class ConvertJoinMapJoin implements NodeProcessor {
}
ReduceSinkOperator rs = (ReduceSinkOperator) op;
- TableScanOperator ts = parseContext.getRsOpToTsOpMap().get(rs);
+ TableScanOperator ts = parseContext.getRsToSemiJoinBranchInfo().get(rs).getTsOp();
if (ts == null) {
// skip, no semijoin branch
continue;
@@ -851,6 +897,11 @@ public class ConvertJoinMapJoin implements NodeProcessor {
}
if (semiJoinMap.size() > 0) {
for (ReduceSinkOperator rs : semiJoinMap.keySet()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Found semijoin optimization from the big table side of a map join, which will cause a task cycle. "
+ + "Removing semijoin "
+ + OperatorUtils.getOpNamePretty(rs) + " - " + OperatorUtils.getOpNamePretty(semiJoinMap.get(rs)));
+ }
GenTezUtils.removeBranch(rs);
GenTezUtils.removeSemiJoinOperator(parseContext, rs,
semiJoinMap.get(rs));
@@ -923,15 +974,14 @@ public class ConvertJoinMapJoin implements NodeProcessor {
return numBuckets;
}
- private boolean convertJoinDynamicPartitionedHashJoin(JoinOperator joinOp, OptimizeTezProcContext context)
+ private boolean convertJoinDynamicPartitionedHashJoin(JoinOperator joinOp, OptimizeTezProcContext context,
+ final long maxSize)
throws SemanticException {
// Attempt dynamic partitioned hash join
// Since we don't have big table index yet, must start with estimate of numReducers
int numReducers = estimateNumBuckets(joinOp, false);
LOG.info("Try dynamic partitioned hash join with estimated " + numReducers + " reducers");
- int bigTablePos = getMapJoinConversionPos(joinOp, context, numReducers, false,
- context.conf.getLongVar(HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD),
- false);
+ int bigTablePos = getMapJoinConversionPos(joinOp, context, numReducers, false, maxSize,false);
if (bigTablePos >= 0) {
// Now that we have the big table index, get real numReducers value based on big table RS
ReduceSinkOperator bigTableParentRS =
@@ -966,11 +1016,11 @@ public class ConvertJoinMapJoin implements NodeProcessor {
return false;
}
- private void fallbackToReduceSideJoin(JoinOperator joinOp, OptimizeTezProcContext context)
+ private void fallbackToReduceSideJoin(JoinOperator joinOp, OptimizeTezProcContext context, final long maxSize)
throws SemanticException {
if (context.conf.getBoolVar(HiveConf.ConfVars.HIVECONVERTJOIN) &&
context.conf.getBoolVar(HiveConf.ConfVars.HIVEDYNAMICPARTITIONHASHJOIN)) {
- if (convertJoinDynamicPartitionedHashJoin(joinOp, context)) {
+ if (convertJoinDynamicPartitionedHashJoin(joinOp, context, maxSize)) {
return;
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java
index b6db6aa..b8c0102 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.hive.ql.optimizer;
import java.util.ArrayList;
+import java.util.EnumSet;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
@@ -42,12 +43,7 @@ import org.apache.hadoop.hive.ql.lib.RuleRegExp;
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.optimizer.spark.SparkPartitionPruningSinkDesc;
-import org.apache.hadoop.hive.ql.parse.OptimizeTezProcContext;
-import org.apache.hadoop.hive.ql.parse.ParseContext;
-import org.apache.hadoop.hive.ql.parse.PrunedPartitionList;
-import org.apache.hadoop.hive.ql.parse.RuntimeValuesInfo;
-import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer;
-import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.*;
import org.apache.hadoop.hive.ql.parse.spark.OptimizeSparkProcContext;
import org.apache.hadoop.hive.ql.plan.*;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFBloomFilter.GenericUDAFBloomFilterEvaluator;
@@ -214,16 +210,34 @@ public class DynamicPartitionPruningOptimization implements NodeProcessor {
} else {
LOG.debug("Column " + column + " is not a partition column");
if (semiJoin && ts.getConf().getFilterExpr() != null) {
- LOG.debug("Initiate semijoin reduction for " + column);
- // Get the table name from which the min-max values will come.
+ LOG.debug("Initiate semijoin reduction for " + column + " ("
+ + ts.getConf().getFilterExpr().getExprString());
+ // Get the table name from which the min-max values and bloom filter will come.
Operator<?> op = ctx.generator;
+
while (!(op == null || op instanceof TableScanOperator)) {
op = op.getParentOperators().get(0);
}
String tableAlias = (op == null ? "" : ((TableScanOperator) op).getConf().getAlias());
+
keyBaseAlias = ctx.generator.getOperatorId() + "_" + tableAlias + "_" + column;
- semiJoinAttempted = generateSemiJoinOperatorPlan(ctx, parseContext, ts, keyBaseAlias);
+ Map<String, SemiJoinHint> hints = parseContext.getSemiJoinHints();
+ if (hints != null) {
+ // If hints map has no entry that would imply that user enforced
+ // no runtime filtering.
+ if (hints.size() > 0) {
+ SemiJoinHint sjHint = hints.get(tableAlias);
+ semiJoinAttempted = generateSemiJoinOperatorPlan(
+ ctx, parseContext, ts, keyBaseAlias, sjHint);
+ if (!semiJoinAttempted && sjHint != null) {
+ throw new SemanticException("The user hint to enforce semijoin failed required conditions");
+ }
+ }
+ } else {
+ semiJoinAttempted = generateSemiJoinOperatorPlan(
+ ctx, parseContext, ts, keyBaseAlias, null);
+ }
}
}
@@ -386,7 +400,13 @@ public class DynamicPartitionPruningOptimization implements NodeProcessor {
// Generates plan for min/max when dynamic partition pruning is ruled out.
private boolean generateSemiJoinOperatorPlan(DynamicListContext ctx, ParseContext parseContext,
- TableScanOperator ts, String keyBaseAlias) throws SemanticException {
+ TableScanOperator ts, String keyBaseAlias, SemiJoinHint sjHint) throws SemanticException {
+
+ // If semijoin hint is enforced, make sure hint is provided
+ if (parseContext.getConf().getBoolVar(ConfVars.TEZ_DYNAMIC_SEMIJOIN_REDUCTION_HINT_ONLY)
+ && sjHint == null) {
+ return false;
+ }
// we will put a fork in the plan at the source of the reduce sink
Operator<? extends OperatorDesc> parentOfRS = ctx.generator.getParentOperators().get(0);
@@ -402,33 +422,62 @@ public class DynamicPartitionPruningOptimization implements NodeProcessor {
exprNodeDesc = exprNodeDesc.getChildren().get(0);
}
- if (exprNodeDesc instanceof ExprNodeColumnDesc) {
- internalColName = ((ExprNodeColumnDesc) exprNodeDesc).getColumn();
- if (parentOfRS instanceof SelectOperator) {
- // Make sure the semijoin branch is not on parition column.
- ExprNodeColumnDesc colExpr = ((ExprNodeColumnDesc) (parentOfRS.
- getColumnExprMap().get(internalColName)));
- String colName = ExprNodeDescUtils.extractColName(colExpr);
-
- // Fetch the TableScan Operator.
- Operator<?> op = parentOfRS.getParentOperators().get(0);
- while (op != null && !(op instanceof TableScanOperator)) {
- op = op.getParentOperators().get(0);
- }
- assert op != null;
-
- Table table = ((TableScanOperator) op).getConf().getTableMetadata();
- if (table.isPartitionKey(colName)) {
- // The column is partition column, skip the optimization.
- return false;
- }
- }
- } else {
+ if (!(exprNodeDesc instanceof ExprNodeColumnDesc)) {
// No column found!
// Bail out
return false;
}
+ internalColName = ((ExprNodeColumnDesc) exprNodeDesc).getColumn();
+ if (parentOfRS instanceof SelectOperator) {
+ // Make sure the semijoin branch is not on partition column.
+ ExprNodeDesc expr = parentOfRS.getColumnExprMap().get(internalColName);
+ while (!(expr instanceof ExprNodeColumnDesc) &&
+ (expr.getChildren() != null)) {
+ expr = expr.getChildren().get(0);
+ }
+
+ if (!(expr instanceof ExprNodeColumnDesc)) {
+ // No column found!
+ // Bail out
+ return false;
+ }
+
+ ExprNodeColumnDesc colExpr = (ExprNodeColumnDesc) expr;
+ String colName = ExprNodeDescUtils.extractColName(colExpr);
+
+ // Fetch the TableScan Operator.
+ Operator<?> op = parentOfRS.getParentOperators().get(0);
+ while (op != null && !(op instanceof TableScanOperator)) {
+ op = op.getParentOperators().get(0);
+ }
+ assert op != null;
+
+ Table table = ((TableScanOperator) op).getConf().getTableMetadata();
+ if (table.isPartitionKey(colName)) {
+ // The column is partition column, skip the optimization.
+ return false;
+ }
+ }
+
+ // If hint is provided and only hinted semijoin optimizations should be
+ // created, then skip other columns on the table
+ if (parseContext.getConf().getBoolVar(ConfVars.TEZ_DYNAMIC_SEMIJOIN_REDUCTION_HINT_ONLY)
+ && sjHint.getColName() != null &&
+ !internalColName.equals(sjHint.getColName())) {
+ return false;
+ }
+
+ // Check if there already exists a semijoin branch
+ GroupByOperator gb = parseContext.getColExprToGBMap().get(key);
+ if (gb != null) {
+ // Already an existing semijoin branch, reuse it
+ createFinalRsForSemiJoinOp(parseContext, ts, gb, key, keyBaseAlias,
+ ctx.parent.getChildren().get(0), sjHint != null);
+ // done!
+ return true;
+ }
+
List<ExprNodeDesc> keyExprs = new ArrayList<ExprNodeDesc>();
keyExprs.add(key);
@@ -472,8 +521,6 @@ public class DynamicPartitionPruningOptimization implements NodeProcessor {
HiveConf.getFloatVar(parseContext.getConf(),
HiveConf.ConfVars.HIVEMAPAGGRMEMORYTHRESHOLD);
- ArrayList<ExprNodeDesc> groupByExprs = new ArrayList<ExprNodeDesc>();
-
// Add min/max and bloom filter aggregations
List<ObjectInspector> aggFnOIs = new ArrayList<ObjectInspector>();
aggFnOIs.add(key.getWritableObjectInspector());
@@ -493,9 +540,17 @@ public class DynamicPartitionPruningOptimization implements NodeProcessor {
AggregationDesc bloomFilter = new AggregationDesc("bloom_filter",
FunctionRegistry.getGenericUDAFEvaluator("bloom_filter", aggFnOIs, false, false),
params, false, Mode.PARTIAL1);
- GenericUDAFBloomFilterEvaluator bloomFilterEval = (GenericUDAFBloomFilterEvaluator) bloomFilter.getGenericUDAFEvaluator();
+ GenericUDAFBloomFilterEvaluator bloomFilterEval =
+ (GenericUDAFBloomFilterEvaluator) bloomFilter.getGenericUDAFEvaluator();
bloomFilterEval.setSourceOperator(selectOp);
+
+ if (sjHint != null && sjHint.getNumEntries() > 0) {
+ LOG.debug("Setting size for " + keyBaseAlias + " to " + sjHint.getNumEntries() + " based on the hint");
+ bloomFilterEval.setHintEntries(sjHint.getNumEntries());
+ }
bloomFilterEval.setMaxEntries(parseContext.getConf().getLongVar(ConfVars.TEZ_MAX_BLOOM_FILTER_ENTRIES));
+ bloomFilterEval.setMinEntries(parseContext.getConf().getLongVar(ConfVars.TEZ_MIN_BLOOM_FILTER_ENTRIES));
+ bloomFilterEval.setFactor(parseContext.getConf().getFloatVar(ConfVars.TEZ_BLOOM_FILTER_FACTOR));
bloomFilter.setGenericUDAFWritableEvaluator(bloomFilterEval);
aggs.add(min);
aggs.add(max);
@@ -547,6 +602,8 @@ public class DynamicPartitionPruningOptimization implements NodeProcessor {
Map<String, ExprNodeDesc> columnExprMap = new HashMap<String, ExprNodeDesc>();
rsOp.setColumnExprMap(columnExprMap);
+ rsOp.getConf().setReducerTraits(EnumSet.of(ReduceSinkDesc.ReducerTraits.QUICKSTART));
+
// Create the final Group By Operator
ArrayList<AggregationDesc> aggsFinal = new ArrayList<AggregationDesc>();
try {
@@ -591,7 +648,12 @@ public class DynamicPartitionPruningOptimization implements NodeProcessor {
bloomFilterFinalParams, false, Mode.FINAL);
GenericUDAFBloomFilterEvaluator bloomFilterEval = (GenericUDAFBloomFilterEvaluator) bloomFilter.getGenericUDAFEvaluator();
bloomFilterEval.setSourceOperator(selectOp);
+ if (sjHint != null && sjHint.getNumEntries() > 0) {
+ bloomFilterEval.setHintEntries(sjHint.getNumEntries());
+ }
bloomFilterEval.setMaxEntries(parseContext.getConf().getLongVar(ConfVars.TEZ_MAX_BLOOM_FILTER_ENTRIES));
+ bloomFilterEval.setMinEntries(parseContext.getConf().getLongVar(ConfVars.TEZ_MIN_BLOOM_FILTER_ENTRIES));
+ bloomFilterEval.setFactor(parseContext.getConf().getFloatVar(ConfVars.TEZ_BLOOM_FILTER_FACTOR));
bloomFilter.setGenericUDAFWritableEvaluator(bloomFilterEval);
aggsFinal.add(min);
@@ -617,23 +679,56 @@ public class DynamicPartitionPruningOptimization implements NodeProcessor {
rsOp.getConf().setOutputOperators(outputOperators);
}
+ createFinalRsForSemiJoinOp(parseContext, ts, groupByOpFinal, key,
+ keyBaseAlias, ctx.parent.getChildren().get(0), sjHint != null);
+
+ return true;
+ }
+
+ private void createFinalRsForSemiJoinOp(
+ ParseContext parseContext, TableScanOperator ts, GroupByOperator gb,
+ ExprNodeDesc key, String keyBaseAlias, ExprNodeDesc colExpr,
+ boolean isHint) throws SemanticException {
+ ArrayList<String> gbOutputNames = new ArrayList<>();
+ // One each for min, max and bloom filter
+ gbOutputNames.add(SemanticAnalyzer.getColumnInternalName(0));
+ gbOutputNames.add(SemanticAnalyzer.getColumnInternalName(1));
+ gbOutputNames.add(SemanticAnalyzer.getColumnInternalName(2));
+
+ int colPos = 0;
+ ArrayList<ExprNodeDesc> rsValueCols = new ArrayList<ExprNodeDesc>();
+ for (int i = 0; i < gbOutputNames.size() - 1; i++) {
+ ExprNodeColumnDesc expr = new ExprNodeColumnDesc(key.getTypeInfo(),
+ gbOutputNames.get(colPos++), "", false);
+ rsValueCols.add(expr);
+ }
+
+ // Bloom Filter uses binary
+ ExprNodeColumnDesc colBFExpr = new ExprNodeColumnDesc(TypeInfoFactory.binaryTypeInfo,
+ gbOutputNames.get(colPos++), "", false);
+ rsValueCols.add(colBFExpr);
+
// Create the final Reduce Sink Operator
ReduceSinkDesc rsDescFinal = PlanUtils.getReduceSinkDesc(
new ArrayList<ExprNodeDesc>(), rsValueCols, gbOutputNames, false,
-1, 0, 1, Operation.NOT_ACID);
ReduceSinkOperator rsOpFinal = (ReduceSinkOperator)OperatorFactory.getAndMakeChild(
- rsDescFinal, new RowSchema(groupByOpFinal.getSchema()), groupByOpFinal);
+ rsDescFinal, new RowSchema(gb.getSchema()), gb);
+ Map<String, ExprNodeDesc> columnExprMap = new HashMap<>();
rsOpFinal.setColumnExprMap(columnExprMap);
- LOG.debug("DynamicMinMaxPushdown: Saving RS to TS mapping: " + rsOpFinal + ": " + ts);
- parseContext.getRsOpToTsOpMap().put(rsOpFinal, ts);
+ LOG.debug("DynamicSemiJoinPushdown: Saving RS to TS mapping: " + rsOpFinal + ": " + ts);
+ SemiJoinBranchInfo sjInfo = new SemiJoinBranchInfo(ts, isHint);
+ parseContext.getRsToSemiJoinBranchInfo().put(rsOpFinal, sjInfo);
// for explain purpose
- if (parseContext.getContext().getExplainConfig() != null
- && parseContext.getContext().getExplainConfig().isFormatted()) {
- List<String> outputOperators = new ArrayList<>();
+ if (parseContext.getContext().getExplainConfig() != null &&
+ parseContext.getContext().getExplainConfig().isFormatted()) {
+ List<String> outputOperators = rsOpFinal.getConf().getOutputOperators();
+ if (outputOperators == null) {
+ outputOperators = new ArrayList<>();
+ }
outputOperators.add(ts.getOperatorId());
- rsOpFinal.getConf().setOutputOperators(outputOperators);
}
// Save the info that is required at query time to resolve dynamic/runtime values.
@@ -648,9 +743,9 @@ public class DynamicPartitionPruningOptimization implements NodeProcessor {
runtimeValuesInfo.setTableDesc(rsFinalTableDesc);
runtimeValuesInfo.setDynamicValueIDs(dynamicValueIDs);
runtimeValuesInfo.setColExprs(rsValueCols);
+ runtimeValuesInfo.setTsColExpr(colExpr);
parseContext.getRsToRuntimeValuesInfoMap().put(rsOpFinal, runtimeValuesInfo);
-
- return true;
+ parseContext.getColExprToGBMap().put(key, gb);
}
private Map<Node, Object> collectDynamicPruningConditions(ExprNodeDesc pred, NodeProcessorCtx ctx)