You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by we...@apache.org on 2017/05/08 20:42:53 UTC
[03/51] [partial] hive git commit: Revert "HIVE-14671 : merge master
into hive-14535 (Wei Zheng)"
http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/ql/src/java/org/apache/hadoop/hive/ql/log/LogDivertAppenderForTest.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/log/LogDivertAppenderForTest.java b/ql/src/java/org/apache/hadoop/hive/ql/log/LogDivertAppenderForTest.java
deleted file mode 100644
index 966c264..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/log/LogDivertAppenderForTest.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql.log;
-
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.logging.log4j.Level;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.core.LogEvent;
-import org.apache.logging.log4j.core.LoggerContext;
-import org.apache.logging.log4j.core.appender.RandomAccessFileAppender;
-import org.apache.logging.log4j.core.appender.routing.Route;
-import org.apache.logging.log4j.core.appender.routing.Routes;
-import org.apache.logging.log4j.core.appender.routing.RoutingAppender;
-import org.apache.logging.log4j.core.config.Configuration;
-import org.apache.logging.log4j.core.config.LoggerConfig;
-import org.apache.logging.log4j.core.config.Node;
-import org.apache.logging.log4j.core.config.plugins.Plugin;
-import org.apache.logging.log4j.core.config.plugins.PluginFactory;
-import org.apache.logging.log4j.core.config.plugins.processor.PluginEntry;
-import org.apache.logging.log4j.core.config.plugins.util.PluginType;
-import org.apache.logging.log4j.core.filter.AbstractFilter;
-import org.apache.logging.log4j.core.layout.PatternLayout;
-
-/**
- * Divert appender to redirect and filter test operation logs to match the output of the original
- * CLI qtest results.
- */
-public final class LogDivertAppenderForTest {
- private LogDivertAppenderForTest() {
- // Prevent instantiation
- }
-
- /**
- * A log filter that filters test messages coming from the logger.
- */
- @Plugin(name = "TestFilter", category = "Core", elementType="filter", printObject = true)
- private static class TestFilter extends AbstractFilter {
- @Override
- public Result filter(LogEvent event) {
- if (event.getLevel().equals(Level.INFO) && "SessionState".equals(event.getLoggerName())) {
- if (event.getMessage().getFormattedMessage().startsWith("PREHOOK:")
- || event.getMessage().getFormattedMessage().startsWith("POSTHOOK:")) {
- return Result.ACCEPT;
- }
- }
- return Result.DENY;
- }
-
- @PluginFactory
- public static TestFilter createFilter() {
- return new TestFilter();
- }
- }
-
- /**
- * If the HIVE_IN_TEST is set, then programmatically register a routing appender to Log4J
- * configuration, which automatically writes the test log of each query to an individual file.
- * The equivalent property configuration is as follows:
- * # queryId based routing file appender
- appender.test-query-routing.type = Routing
- appender.test-query-routing.name = test-query-routing
- appender.test-query-routing.routes.type = Routes
- appender.test-query-routing.routes.pattern = $${ctx:queryId}
- # default route
- appender.test-query-routing.routes.test-route-default.type = Route
- appender.test-query-routing.routes.test-route-default.key = $${ctx:queryId}
- appender.test-query-routing.routes.test-route-default.app.type = NullAppender
- appender.test-query-routing.routes.test-route-default.app.name = test-null-appender
- # queryId based route
- appender.test-query-routing.routes.test-route-mdc.type = Route
- appender.test-query-routing.routes.test-route-mdc.name = test-query-routing
- appender.test-query-routing.routes.test-route-mdc.app.type = RandomAccessFile
- appender.test-query-routing.routes.test-route-mdc.app.name = test-query-file-appender
- appender.test-query-routing.routes.test-route-mdc.app.fileName = ${sys:hive.log.dir}/${ctx:sessionId}/${ctx:queryId}.test
- appender.test-query-routing.routes.test-route-mdc.app.layout.type = PatternLayout
- appender.test-query-routing.routes.test-route-mdc.app.layout.pattern = %d{ISO8601} %5p %c{2}: %m%n
- appender.test-query-routing.routes.test-route-mdc.app.filter.type = TestFilter
- * @param conf the configuration for HiveServer2 instance
- */
- public static void registerRoutingAppenderIfInTest(org.apache.hadoop.conf.Configuration conf) {
- if (!conf.getBoolean(HiveConf.ConfVars.HIVE_IN_TEST.varname,
- HiveConf.ConfVars.HIVE_IN_TEST.defaultBoolVal)) {
- // If not in test mode, then do no create the appender
- return;
- }
-
- String logLocation =
- HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_SERVER2_LOGGING_OPERATION_LOG_LOCATION);
-
- // Create test-null-appender to drop events without queryId
- PluginEntry nullAppenderEntry = new PluginEntry();
- nullAppenderEntry.setClassName(NullAppender.class.getName());
- PluginType<NullAppender> nullAppenderType =
- new PluginType<NullAppender>(nullAppenderEntry, NullAppender.class, "appender");
- Node nullAppenderChildNode = new Node(null, "test-null-appender", nullAppenderType);
-
- // Create default route where events go without queryId
- PluginEntry defaultRouteEntry = new PluginEntry();
- defaultRouteEntry.setClassName(Route.class.getName());
- PluginType<Route> defaultRouteType = new PluginType<Route>(defaultRouteEntry, Route.class, "");
- Node defaultRouteNode = new Node(null, "test-route-default", defaultRouteType);
- // Add the test-null-appender to the default route
- defaultRouteNode.getChildren().add(nullAppenderChildNode);
-
- // Create queryId based route
- PluginEntry queryIdRouteEntry = new PluginEntry();
- queryIdRouteEntry.setClassName(Route.class.getName());
- PluginType<Route> queryIdRouteType = new PluginType<Route>(queryIdRouteEntry, Route.class, "");
- Node queryIdRouteNode = new Node(null, "test-route-mdc", queryIdRouteType);
-
- // Create the queryId appender for the queryId route
- PluginEntry queryIdAppenderEntry = new PluginEntry();
- queryIdAppenderEntry.setClassName(RandomAccessFileAppender.class.getName());
- PluginType<RandomAccessFileAppender> queryIdAppenderType =
- new PluginType<RandomAccessFileAppender>(queryIdAppenderEntry,
- RandomAccessFileAppender.class, "appender");
- Node queryIdAppenderNode =
- new Node(queryIdRouteNode, "test-query-file-appender", queryIdAppenderType);
- queryIdAppenderNode.getAttributes().put("fileName", logLocation
- + "/${ctx:sessionId}/${ctx:queryId}.test");
- queryIdAppenderNode.getAttributes().put("name", "test-query-file-appender");
- // Add the queryId appender to the queryId based route
- queryIdRouteNode.getChildren().add(queryIdAppenderNode);
-
- // Create the filter for the queryId appender
- PluginEntry filterEntry = new PluginEntry();
- filterEntry.setClassName(TestFilter.class.getName());
- PluginType<TestFilter> filterType =
- new PluginType<TestFilter>(filterEntry, TestFilter.class, "");
- Node filterNode = new Node(queryIdAppenderNode, "test-filter", filterType);
- // Add the filter to the queryId appender
- queryIdAppenderNode.getChildren().add(filterNode);
-
- // Create the layout for the queryId appender
- PluginEntry layoutEntry = new PluginEntry();
- layoutEntry.setClassName(PatternLayout.class.getName());
- PluginType<PatternLayout> layoutType =
- new PluginType<PatternLayout>(layoutEntry, PatternLayout.class, "");
- Node layoutNode = new Node(queryIdAppenderNode, "PatternLayout", layoutType);
- layoutNode.getAttributes().put("pattern", LogDivertAppender.nonVerboseLayout);
- // Add the layout to the queryId appender
- queryIdAppenderNode.getChildren().add(layoutNode);
-
- // Create the route objects based on the Nodes
- Route defaultRoute = Route.createRoute(null, "${ctx:queryId}", defaultRouteNode);
- Route mdcRoute = Route.createRoute(null, null, queryIdRouteNode);
- // Create the routes group
- Routes routes = Routes.createRoutes("${ctx:queryId}", defaultRoute, mdcRoute);
-
- LoggerContext context = (LoggerContext)LogManager.getContext(false);
- Configuration configuration = context.getConfiguration();
-
- // Create the appender
- RoutingAppender routingAppender = RoutingAppender.createAppender("test-query-routing",
- "true",
- routes,
- configuration,
- null,
- null,
- null);
-
- LoggerConfig loggerConfig = configuration.getRootLogger();
- loggerConfig.addAppender(routingAppender, null, null);
- context.updateLoggers();
- routingAppender.start();
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index de68876..ea87cb4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -697,11 +697,6 @@ public class Hive {
throws InvalidOperationException, HiveException {
try {
validatePartition(newPart);
- String location = newPart.getLocation();
- if (location != null && !Utilities.isDefaultNameNode(conf)) {
- location = Utilities.getQualifiedPath(conf, new Path(location));
- newPart.setLocation(location);
- }
getMSC().alter_partition(dbName, tblName, newPart.getTPartition(), environmentContext);
} catch (MetaException e) {
@@ -741,11 +736,6 @@ public class Hive {
if (tmpPart.getParameters() != null) {
tmpPart.getParameters().remove(hive_metastoreConstants.DDL_TIME);
}
- String location = tmpPart.getLocation();
- if (location != null && !Utilities.isDefaultNameNode(conf)) {
- location = Utilities.getQualifiedPath(conf, new Path(location));
- tmpPart.setLocation(location);
- }
newTParts.add(tmpPart.getTPartition());
}
getMSC().alter_partitions(names[0], names[1], newTParts, environmentContext);
@@ -1218,27 +1208,6 @@ public class Hive {
}
}
-
-
- /**
- * Truncates the table/partition as per specifications. Just trash the data files
- *
- * @param dbDotTableName
- * name of the table
- * @throws HiveException
- */
- public void truncateTable(String dbDotTableName, Map<String, String> partSpec) throws HiveException {
- try {
- Table table = getTable(dbDotTableName, true);
-
- List<String> partNames = ((null == partSpec)
- ? null : getPartitionNames(table.getDbName(), table.getTableName(), partSpec, (short) -1));
- getMSC().truncateTable(table.getDbName(), table.getTableName(), partNames);
- } catch (Exception e) {
- throw new HiveException(e);
- }
- }
-
public HiveConf getConf() {
return (conf);
}
@@ -1444,6 +1413,7 @@ public class Hive {
*/
public List<String> getTablesByType(String dbName, String pattern, TableType type)
throws HiveException {
+ List<String> retList = new ArrayList<String>();
if (dbName == null)
dbName = SessionState.get().getCurrentDatabase();
@@ -1608,20 +1578,7 @@ public class Hive {
return getDatabase(currentDb);
}
- /**
- * @param loadPath
- * @param tableName
- * @param partSpec
- * @param replace
- * @param inheritTableSpecs
- * @param isSkewedStoreAsSubdir
- * @param isSrcLocal
- * @param isAcid
- * @param hasFollowingStatsTask
- * @return
- * @throws HiveException
- */
- public void loadPartition(Path loadPath, String tableName,
+ public void loadSinglePartition(Path loadPath, String tableName,
Map<String, String> partSpec, boolean replace, boolean inheritTableSpecs,
boolean isSkewedStoreAsSubdir, boolean isSrcLocal, boolean isAcid,
boolean hasFollowingStatsTask, Long mmWriteId, boolean isCommitMmWrite)
@@ -1637,6 +1594,7 @@ public class Hive {
}
}
+
public void commitMmTableWrite(Table tbl, Long mmWriteId)
throws HiveException {
try {
@@ -1665,11 +1623,7 @@ public class Hive {
* location/inputformat/outputformat/serde details from table spec
* @param isSrcLocal
* If the source directory is LOCAL
- * @param isAcid
- * true if this is an ACID operation
- * @param hasFollowingStatsTask
- * true if there is a following task which updates the stats, so, this method need not update.
- * @return Partition object being loaded with data
+ * @param isAcid true if this is an ACID operation
*/
public Partition loadPartition(Path loadPath, Table tbl, Map<String, String> partSpec,
boolean replace, boolean inheritTableSpecs, boolean isSkewedStoreAsSubdir,
@@ -1677,7 +1631,6 @@ public class Hive {
throws HiveException {
Path tblDataLocationPath = tbl.getDataLocation();
try {
- // Get the partition object if it already exists
Partition oldPart = getPartition(tbl, partSpec, false);
/**
* Move files before creating the partition since down stream processes
@@ -1715,12 +1668,6 @@ public class Hive {
List<Path> newFiles = null;
PerfLogger perfLogger = SessionState.getPerfLogger();
perfLogger.PerfLogBegin("MoveTask", "FileMoves");
- // If config is set, table is not temporary and partition being inserted exists, capture
- // the list of files added. For not yet existing partitions (insert overwrite to new partition
- // or dynamic partition inserts), the add partition event will capture the list of files added.
- if (conf.getBoolVar(ConfVars.FIRE_EVENTS_FOR_DML) && !tbl.isTemporary() && (null != oldPart)) {
- newFiles = Collections.synchronizedList(new ArrayList<Path>());
- }
// TODO: this assumes both paths are qualified; which they are, currently.
if (mmWriteId != null && loadPath.equals(newPartPath)) {
// MM insert query, move itself is a no-op.
@@ -1731,8 +1678,7 @@ public class Hive {
}
Utilities.LOG14535.info("maybe deleting stuff from " + oldPartPath + " (new " + newPartPath + ") for replace");
if (replace && oldPartPath != null) {
- boolean isAutoPurge = "true".equalsIgnoreCase(tbl.getProperty("auto.purge"));
- deleteOldPathForReplace(newPartPath, oldPartPath, getConf(), isAutoPurge,
+ deleteOldPathForReplace(newPartPath, oldPartPath, getConf(),
new ValidWriteIds.IdPathFilter(mmWriteId, false, true), mmWriteId != null,
tbl.isStoredAsSubDirectories() ? tbl.getSkewedColNames().size() : 0);
}
@@ -1747,10 +1693,13 @@ public class Hive {
}
Utilities.LOG14535.info("moving " + loadPath + " to " + destPath);
if (replace || (oldPart == null && !isAcid)) {
- boolean isAutoPurge = "true".equalsIgnoreCase(tbl.getProperty("auto.purge"));
replaceFiles(tbl.getPath(), loadPath, destPath, oldPartPath, getConf(),
- isSrcLocal, isAutoPurge, newFiles, filter, mmWriteId != null);
+ isSrcLocal, filter, mmWriteId != null);
} else {
+ if (areEventsForDmlNeeded(tbl, oldPart)) {
+ newFiles = Collections.synchronizedList(new ArrayList<Path>());
+ }
+
FileSystem fs = tbl.getDataLocation().getFileSystem(conf);
Hive.copyFiles(conf, loadPath, destPath, fs, isSrcLocal, isAcid, newFiles);
}
@@ -1759,17 +1708,13 @@ public class Hive {
Partition newTPart = oldPart != null ? oldPart : new Partition(tbl, partSpec, newPartPath);
alterPartitionSpecInMemory(tbl, partSpec, newTPart.getTPartition(), inheritTableSpecs, newPartPath.toString());
validatePartition(newTPart);
-
- // Generate an insert event only if inserting into an existing partition
- // When inserting into a new partition, the add partition event takes care of insert event
- if ((null != oldPart) && (null != newFiles)) {
- fireInsertEvent(tbl, partSpec, replace, newFiles);
+ if ((null != newFiles) || replace) {
+ fireInsertEvent(tbl, partSpec, newFiles);
} else {
- LOG.debug("No new files were created, and is not a replace, or we're inserting into a "
- + "partition that does not exist yet. Skipping generating INSERT event.");
+ LOG.debug("No new files were created, and is not a replace. Skipping generating INSERT event.");
}
- // column stats will be inaccurate
+ //column stats will be inaccurate
StatsSetupConst.clearColumnStatsState(newTPart.getParameters());
// recreate the partition if it existed before
@@ -2213,8 +2158,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
Utilities.LOG14535.info("not moving " + loadPath + " to " + tbl.getPath());
if (replace) {
Path tableDest = tbl.getPath();
- boolean isAutopurge = "true".equalsIgnoreCase(tbl.getProperty("auto.purge"));
- deleteOldPathForReplace(tableDest, tableDest, sessionConf, isAutopurge,
+ deleteOldPathForReplace(tableDest, tableDest, sessionConf,
new ValidWriteIds.IdPathFilter(mmWriteId, false, true), mmWriteId != null,
tbl.isStoredAsSubDirectories() ? tbl.getSkewedColNames().size() : 0);
}
@@ -2230,9 +2174,8 @@ private void constructOneLBLocationMap(FileStatus fSta,
}
Utilities.LOG14535.info("moving " + loadPath + " to " + tblPath + " (replace = " + replace + ")");
if (replace) {
- boolean isAutopurge = "true".equalsIgnoreCase(tbl.getProperty("auto.purge"));
replaceFiles(tblPath, loadPath, destPath, tblPath,
- sessionConf, isSrcLocal, isAutopurge, newFiles, filter, mmWriteId != null);
+ sessionConf, isSrcLocal, filter, mmWriteId != null);
} else {
try {
FileSystem fs = tbl.getDataLocation().getFileSystem(sessionConf);
@@ -2278,7 +2221,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
commitMmTableWrite(tbl, mmWriteId);
}
- fireInsertEvent(tbl, null, replace, newFiles);
+ fireInsertEvent(tbl, null, newFiles);
}
/**
@@ -2503,7 +2446,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
}
else {
alterPartitionSpec(tbl, partSpec, tpart, inheritTableSpecs, partPath);
- fireInsertEvent(tbl, partSpec, true, newFiles);
+ fireInsertEvent(tbl, partSpec, newFiles);
}
}
if (tpart == null) {
@@ -2553,7 +2496,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
tpart.getSd().setLocation(partPath);
}
- private void fireInsertEvent(Table tbl, Map<String, String> partitionSpec, boolean replace, List<Path> newFiles)
+ private void fireInsertEvent(Table tbl, Map<String, String> partitionSpec, List<Path> newFiles)
throws HiveException {
if (conf.getBoolVar(ConfVars.FIRE_EVENTS_FOR_DML)) {
LOG.debug("Firing dml insert event");
@@ -2565,7 +2508,6 @@ private void constructOneLBLocationMap(FileStatus fSta,
FileSystem fileSystem = tbl.getDataLocation().getFileSystem(conf);
FireEventRequestData data = new FireEventRequestData();
InsertEventRequestData insertData = new InsertEventRequestData();
- insertData.setReplace(replace);
data.setInsertData(insertData);
if (newFiles != null && newFiles.size() > 0) {
for (Path p : newFiles) {
@@ -3111,6 +3053,8 @@ private void constructOneLBLocationMap(FileStatus fSta,
if (!fullDestStatus.getFileStatus().isDirectory()) {
throw new HiveException(destf + " is not a directory.");
}
+ final boolean inheritPerms = HiveConf.getBoolVar(conf,
+ HiveConf.ConfVars.HIVE_WAREHOUSE_SUBDIR_INHERIT_PERMS);
final List<Future<ObjectPair<Path, Path>>> futures = new LinkedList<>();
final ExecutorService pool = conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25) > 0 ?
Executors.newFixedThreadPool(conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25),
@@ -3137,10 +3081,15 @@ private void constructOneLBLocationMap(FileStatus fSta,
// If we do a rename for a non-local file, we will be transfering the original
// file permissions from source to the destination. Else, in case of mvFile() where we
// copy from source to destination, we will inherit the destination's parent group ownership.
+ final String srcGroup = isRenameAllowed ? srcFile.getGroup() :
+ fullDestStatus.getFileStatus().getGroup();
if (null == pool) {
try {
Path destPath = mvFile(conf, srcFs, srcP, destFs, destf, isSrcLocal, isRenameAllowed);
+ if (inheritPerms) {
+ HdfsUtils.setFullFileStatus(conf, fullDestStatus, srcGroup, destFs, destPath, false);
+ }
if (null != newFiles) {
newFiles.add(destPath);
}
@@ -3155,7 +3104,9 @@ private void constructOneLBLocationMap(FileStatus fSta,
SessionState.setCurrentSessionState(parentSession);
Path destPath = mvFile(conf, srcFs, srcP, destFs, destf, isSrcLocal, isRenameAllowed);
-
+ if (inheritPerms) {
+ HdfsUtils.setFullFileStatus(conf, fullDestStatus, srcGroup, destFs, destPath, false);
+ }
if (null != newFiles) {
newFiles.add(destPath);
}
@@ -3165,7 +3116,11 @@ private void constructOneLBLocationMap(FileStatus fSta,
}
}
}
- if (null != pool) {
+ if (null == pool) {
+ if (inheritPerms) {
+ HdfsUtils.setFullFileStatus(conf, fullDestStatus, null, destFs, destf, true);
+ }
+ } else {
pool.shutdown();
for (Future<ObjectPair<Path, Path>> future : futures) {
try {
@@ -3223,34 +3178,11 @@ private void constructOneLBLocationMap(FileStatus fSta,
return ShimLoader.getHadoopShims().getPathWithoutSchemeAndAuthority(path);
}
- /**
- * <p>
- * Moves a file from one {@link Path} to another. If {@code isRenameAllowed} is true then the
- * {@link FileSystem#rename(Path, Path)} method is used to move the file. If its false then the data is copied, if
- * {@code isSrcLocal} is true then the {@link FileSystem#copyFromLocalFile(Path, Path)} method is used, else
- * {@link FileUtils#copy(FileSystem, Path, FileSystem, Path, boolean, boolean, HiveConf)} is used.
- * </p>
- *
- * <p>
- * If the destination file already exists, then {@code _copy_[counter]} is appended to the file name, where counter
- * is an integer starting from 1.
- * </p>
- *
- * @param conf the {@link HiveConf} to use if copying data
- * @param sourceFs the {@link FileSystem} where the source file exists
- * @param sourcePath the {@link Path} to move
- * @param destFs the {@link FileSystem} to move the file to
- * @param destDirPath the {@link Path} to move the file to
- * @param isSrcLocal if the source file is on the local filesystem
- * @param isRenameAllowed true if the data should be renamed and not copied, false otherwise
- *
- * @return the {@link Path} the source file was moved to
- *
- * @throws IOException if there was an issue moving the file
- */
private static Path mvFile(HiveConf conf, FileSystem sourceFs, Path sourcePath, FileSystem destFs, Path destDirPath,
boolean isSrcLocal, boolean isRenameAllowed) throws IOException {
+ boolean isBlobStoragePath = BlobStorageUtils.isBlobStoragePath(conf, destDirPath);
+
// Strip off the file type, if any so we don't make:
// 000000_0.gz -> 000000_0.gz_copy_1
final String fullname = sourcePath.getName();
@@ -3260,19 +3192,27 @@ private void constructOneLBLocationMap(FileStatus fSta,
Path destFilePath = new Path(destDirPath, fullname);
/*
- * The below loop may perform bad when the destination file already exists and it has too many _copy_
- * files as well. A desired approach was to call listFiles() and get a complete list of files from
- * the destination, and check whether the file exists or not on that list. However, millions of files
- * could live on the destination directory, and on concurrent situations, this can cause OOM problems.
- *
- * I'll leave the below loop for now until a better approach is found.
- */
- for (int counter = 1; destFs.exists(destFilePath); counter++) {
- destFilePath = new Path(destDirPath, name + ("_copy_" + counter) + (!type.isEmpty() ? "." + type : ""));
+ * The below loop may perform bad when the destination file already exists and it has too many _copy_
+ * files as well. A desired approach was to call listFiles() and get a complete list of files from
+ * the destination, and check whether the file exists or not on that list. However, millions of files
+ * could live on the destination directory, and on concurrent situations, this can cause OOM problems.
+ *
+ * I'll leave the below loop for now until a better approach is found.
+ */
+
+ int counter = 1;
+ if (!isRenameAllowed || isBlobStoragePath) {
+ while (destFs.exists(destFilePath)) {
+ destFilePath = new Path(destDirPath, name + ("_copy_" + counter) + (!type.isEmpty() ? "." + type : ""));
+ counter++;
+ }
}
if (isRenameAllowed) {
- destFs.rename(sourcePath, destFilePath);
+ while (!destFs.rename(sourcePath, destFilePath)) {
+ destFilePath = new Path(destDirPath, name + ("_copy_" + counter) + (!type.isEmpty() ? "." + type : ""));
+ counter++;
+ }
} else if (isSrcLocal) {
destFs.copyFromLocalFile(sourcePath, destFilePath);
} else {
@@ -3281,6 +3221,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
false, // overwrite destination
conf);
}
+
return destFilePath;
}
@@ -3309,30 +3250,12 @@ private void constructOneLBLocationMap(FileStatus fSta,
}
}
- // List the new files in destination path which gets copied from source.
- public static void listNewFilesRecursively(final FileSystem destFs, Path dest,
- List<Path> newFiles) throws HiveException {
- try {
- for (FileStatus fileStatus : destFs.listStatus(dest, FileUtils.HIDDEN_FILES_PATH_FILTER)) {
- if (fileStatus.isDirectory()) {
- // If it is a sub-directory, then recursively list the files.
- listNewFilesRecursively(destFs, fileStatus.getPath(), newFiles);
- } else {
- newFiles.add(fileStatus.getPath());
- }
- }
- } catch (IOException e) {
- LOG.error("Failed to get source file statuses", e);
- throw new HiveException(e.getMessage(), e);
- }
- }
-
//it is assumed that parent directory of the destf should already exist when this
//method is called. when the replace value is true, this method works a little different
//from mv command if the destf is a directory, it replaces the destf instead of moving under
//the destf. in this case, the replaced destf still preserves the original destf's permission
- public static boolean moveFile(final HiveConf conf, Path srcf, final Path destf, boolean replace,
- boolean isSrcLocal) throws HiveException {
+ public static boolean moveFile(final HiveConf conf, Path srcf, final Path destf,
+ boolean replace, boolean isSrcLocal) throws HiveException {
final FileSystem srcFs, destFs;
try {
destFs = destf.getFileSystem(conf);
@@ -3343,10 +3266,13 @@ private void constructOneLBLocationMap(FileStatus fSta,
try {
srcFs = srcf.getFileSystem(conf);
} catch (IOException e) {
- LOG.error("Failed to get src fs", e);
+ LOG.error("Failed to get dest fs", e);
throw new HiveException(e.getMessage(), e);
}
+ //needed for perm inheritance.
+ final boolean inheritPerms = HiveConf.getBoolVar(conf,
+ HiveConf.ConfVars.HIVE_WAREHOUSE_SUBDIR_INHERIT_PERMS);
HdfsUtils.HadoopFileStatus destStatus = null;
// If source path is a subdirectory of the destination path (or the other way around):
@@ -3358,7 +3284,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
boolean srcIsSubDirOfDest = isSubDir(srcf, destf, srcFs, destFs, isSrcLocal),
destIsSubDirOfSrc = isSubDir(destf, srcf, destFs, srcFs, false);
try {
- if (replace) {
+ if (inheritPerms || replace) {
try{
destStatus = new HdfsUtils.HadoopFileStatus(conf, destFs, destf);
//if destf is an existing directory:
@@ -3371,6 +3297,10 @@ private void constructOneLBLocationMap(FileStatus fSta,
LOG.debug("The path " + destf.toString() + " is deleted");
}
} catch (FileNotFoundException ignore) {
+ //if dest dir does not exist, any re
+ if (inheritPerms) {
+ destStatus = new HdfsUtils.HadoopFileStatus(conf, destFs, destf.getParent());
+ }
}
}
final HdfsUtils.HadoopFileStatus desiredStatus = destStatus;
@@ -3378,6 +3308,9 @@ private void constructOneLBLocationMap(FileStatus fSta,
if (isSrcLocal) {
// For local src file, copy to hdfs
destFs.copyFromLocalFile(srcf, destf);
+ if (inheritPerms) {
+ HdfsUtils.setFullFileStatus(conf, destStatus, destFs, destf, true);
+ }
return true;
} else {
if (needToCopy(srcf, destf, srcFs, destFs)) {
@@ -3414,7 +3347,11 @@ private void constructOneLBLocationMap(FileStatus fSta,
public Void call() throws Exception {
SessionState.setCurrentSessionState(parentSession);
final String group = srcStatus.getGroup();
- if(!destFs.rename(srcStatus.getPath(), destFile)) {
+ if(destFs.rename(srcStatus.getPath(), destFile)) {
+ if (inheritPerms) {
+ HdfsUtils.setFullFileStatus(conf, desiredStatus, group, destFs, destFile, false);
+ }
+ } else {
throw new IOException("rename for src path: " + srcStatus.getPath() + " to dest path:"
+ destFile + " returned false");
}
@@ -3423,7 +3360,11 @@ private void constructOneLBLocationMap(FileStatus fSta,
}));
}
}
- if (null != pool) {
+ if (null == pool) {
+ if (inheritPerms) {
+ HdfsUtils.setFullFileStatus(conf, desiredStatus, null, destFs, destf, true);
+ }
+ } else {
pool.shutdown();
for (Future<Void> future : futures) {
try {
@@ -3438,6 +3379,9 @@ private void constructOneLBLocationMap(FileStatus fSta,
return true;
} else {
if (destFs.rename(srcf, destf)) {
+ if (inheritPerms) {
+ HdfsUtils.setFullFileStatus(conf, destStatus, destFs, destf, true);
+ }
return true;
}
return false;
@@ -3488,10 +3432,12 @@ private void constructOneLBLocationMap(FileStatus fSta,
*/
static protected void copyFiles(HiveConf conf, Path srcf, Path destf,
FileSystem fs, boolean isSrcLocal, boolean isAcid, List<Path> newFiles) throws HiveException {
+ boolean inheritPerms = HiveConf.getBoolVar(conf,
+ HiveConf.ConfVars.HIVE_WAREHOUSE_SUBDIR_INHERIT_PERMS);
try {
// create the destination if it does not exist
if (!fs.exists(destf)) {
- FileUtils.mkdir(fs, destf, conf);
+ FileUtils.mkdir(fs, destf, inheritPerms, conf);
}
} catch (IOException e) {
throw new HiveException(
@@ -3625,16 +3571,11 @@ private void constructOneLBLocationMap(FileStatus fSta,
* @param oldPath
* The directory where the old data location, need to be cleaned up. Most of time, will be the same
* as destf, unless its across FileSystem boundaries.
- * @param purge
- * When set to true files which needs to be deleted are not moved to Trash
* @param isSrcLocal
* If the source directory is LOCAL
- * @param newFiles
- * Output the list of new files replaced in the destination path
*/
protected void replaceFiles(Path tablePath, Path srcf, Path destf, Path oldPath, HiveConf conf,
- boolean isSrcLocal, boolean purge, List<Path> newFiles, PathFilter deletePathFilter,
- boolean isMmTable) throws HiveException {
+ boolean isSrcLocal, PathFilter deletePathFilter, boolean isMmTable) throws HiveException {
try {
FileSystem destFs = destf.getFileSystem(conf);
@@ -3655,12 +3596,14 @@ private void constructOneLBLocationMap(FileStatus fSta,
if (oldPath != null) {
// TODO: we assume lbLevels is 0 here. Same as old code for non-MM.
// For MM tables, this can only be a LOAD command. Does LOAD even support LB?
- deleteOldPathForReplace(destf, oldPath, conf, purge, deletePathFilter, isMmTable, 0);
+ deleteOldPathForReplace(destf, oldPath, conf, deletePathFilter, isMmTable, 0);
}
// first call FileUtils.mkdir to make sure that destf directory exists, if not, it creates
- // destf
- boolean destfExist = FileUtils.mkdir(destFs, destf, conf);
+ // destf with inherited permissions
+ boolean inheritPerms = HiveConf.getBoolVar(conf, HiveConf.ConfVars
+ .HIVE_WAREHOUSE_SUBDIR_INHERIT_PERMS);
+ boolean destfExist = FileUtils.mkdir(destFs, destf, inheritPerms, conf);
if(!destfExist) {
throw new IOException("Directory " + destf.toString()
+ " does not exist and could not be created.");
@@ -3676,23 +3619,11 @@ private void constructOneLBLocationMap(FileStatus fSta,
if (!moveFile(conf, srcs[0].getPath(), destf, true, isSrcLocal)) {
throw new IOException("Error moving: " + srcf + " into: " + destf);
}
-
- // Add file paths of the files that will be moved to the destination if the caller needs it
- if (null != newFiles) {
- listNewFilesRecursively(destFs, destf, newFiles);
- }
- } else {
- // its either a file or glob
+ } else { // its either a file or glob
for (FileStatus src : srcs) {
- Path destFile = new Path(destf, src.getPath().getName());
- if (!moveFile(conf, src.getPath(), destFile, true, isSrcLocal)) {
+ if (!moveFile(conf, src.getPath(), new Path(destf, src.getPath().getName()), true, isSrcLocal)) {
throw new IOException("Error moving: " + srcf + " into: " + destf);
}
-
- // Add file paths of the files that will be moved to the destination if the caller needs it
- if (null != newFiles) {
- newFiles.add(destFile);
- }
}
}
} catch (IOException e) {
@@ -3700,7 +3631,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
}
}
- private void deleteOldPathForReplace(Path destPath, Path oldPath, HiveConf conf, boolean purge,
+ private void deleteOldPathForReplace(Path destPath, Path oldPath, HiveConf conf,
PathFilter pathFilter, boolean isMmTable, int lbLevels) throws HiveException {
Utilities.LOG14535.info("Deleting old paths for replace in " + destPath + " and old path " + oldPath);
boolean isOldPathUnderDestf = false;
@@ -3714,7 +3645,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
isOldPathUnderDestf = isSubDir(oldPath, destPath, oldFs, destFs, false);
if (isOldPathUnderDestf || isMmTable) {
if (lbLevels == 0 || !isMmTable) {
- cleanUpOneDirectoryForReplace(oldPath, oldFs, pathFilter, conf, purge);
+ cleanUpOneDirectoryForReplace(oldPath, oldFs, pathFilter, conf);
} else {
// We need to clean up different MM IDs from each LB directory separately.
// Avoid temporary directories in the immediate table/part dir.
@@ -3732,7 +3663,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
throw new HiveException("Unexpected path during overwrite: " + lbPath);
}
Utilities.LOG14535.info("Cleaning up LB directory " + lbPath);
- cleanUpOneDirectoryForReplace(lbPath, oldFs, pathFilter, conf, purge);
+ cleanUpOneDirectoryForReplace(lbPath, oldFs, pathFilter, conf);
}
}
}
@@ -3750,7 +3681,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
private void cleanUpOneDirectoryForReplace(Path path, FileSystem fs,
- PathFilter pathFilter, HiveConf conf, boolean purge) throws IOException, HiveException {
+ PathFilter pathFilter, HiveConf conf) throws IOException, HiveException {
FileStatus[] statuses = fs.listStatus(path, pathFilter);
if (statuses == null || statuses.length == 0) return;
String s = "Deleting files under " + path + " for replace: ";
@@ -3758,7 +3689,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
s += file.getPath().getName() + ", ";
}
Utilities.LOG14535.info(s);
- if (!trashFiles(fs, statuses, conf, purge)) {
+ if (!trashFiles(fs, statuses, conf)) {
throw new HiveException("Old path " + path + " has not been cleaned up.");
}
}
@@ -3772,8 +3703,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
* @return true if deletion successful
* @throws IOException
*/
- public static boolean trashFiles(final FileSystem fs, final FileStatus[] statuses,
- final Configuration conf, final boolean purge)
+ public static boolean trashFiles(final FileSystem fs, final FileStatus[] statuses, final Configuration conf)
throws IOException {
boolean result = true;
@@ -3787,13 +3717,13 @@ private void constructOneLBLocationMap(FileStatus fSta,
final SessionState parentSession = SessionState.get();
for (final FileStatus status : statuses) {
if (null == pool) {
- result &= FileUtils.moveToTrash(fs, status.getPath(), conf, purge);
+ result &= FileUtils.moveToTrash(fs, status.getPath(), conf);
} else {
futures.add(pool.submit(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
SessionState.setCurrentSessionState(parentSession);
- return FileUtils.moveToTrash(fs, status.getPath(), conf, purge);
+ return FileUtils.moveToTrash(fs, status.getPath(), conf);
}
}));
}
http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMaterializedViewsRegistry.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMaterializedViewsRegistry.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMaterializedViewsRegistry.java
index b121eea..1d78b4c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMaterializedViewsRegistry.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMaterializedViewsRegistry.java
@@ -34,7 +34,6 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.calcite.adapter.druid.DruidQuery;
import org.apache.calcite.adapter.druid.DruidSchema;
import org.apache.calcite.adapter.druid.DruidTable;
-import org.apache.calcite.adapter.druid.LocalInterval;
import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptMaterialization;
@@ -311,7 +310,7 @@ public final class HiveMaterializedViewsRegistry {
}
metrics.add(field.getName());
}
- List<LocalInterval> intervals = Arrays.asList(DruidTable.DEFAULT_INTERVAL);
+ List<Interval> intervals = Arrays.asList(DruidTable.DEFAULT_INTERVAL);
DruidTable druidTable = new DruidTable(new DruidSchema(address, address, false),
dataSource, RelDataTypeImpl.proto(rowType), metrics, DruidTable.DEFAULT_TIMESTAMP_COLUMN, intervals);
http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMetaStoreChecker.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMetaStoreChecker.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMetaStoreChecker.java
index 4add836..6805c17 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMetaStoreChecker.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMetaStoreChecker.java
@@ -19,7 +19,6 @@ package org.apache.hadoop.hive.ql.metadata;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
@@ -36,10 +35,7 @@ import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import com.google.common.collect.Sets;
-import org.apache.hadoop.hive.common.StringInternUtils;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.ql.log.PerfLogger;
-import org.apache.hadoop.hive.ql.session.SessionState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.FileStatus;
@@ -53,6 +49,8 @@ import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.ql.metadata.CheckResult.PartitionResult;
+import org.apache.hadoop.hive.ql.optimizer.ppr.PartitionPruner;
+import org.apache.hadoop.hive.ql.parse.PrunedPartitionList;
import org.apache.thrift.TException;
import com.google.common.util.concurrent.MoreExecutors;
@@ -66,7 +64,6 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
public class HiveMetaStoreChecker {
public static final Logger LOG = LoggerFactory.getLogger(HiveMetaStoreChecker.class);
- public static final String CLASS_NAME = HiveMetaStoreChecker.class.getName();
private final Hive hive;
private final HiveConf conf;
@@ -211,28 +208,19 @@ public class HiveMetaStoreChecker {
return;
}
- PartitionIterable parts;
+ List<Partition> parts = new ArrayList<Partition>();
boolean findUnknownPartitions = true;
if (table.isPartitioned()) {
if (partitions == null || partitions.isEmpty()) {
- String mode = HiveConf.getVar(conf, ConfVars.HIVEMAPREDMODE, (String) null);
- if ("strict".equalsIgnoreCase(mode)) {
- parts = new PartitionIterable(hive, table, null, conf.getIntVar(
- HiveConf.ConfVars.METASTORE_BATCH_RETRIEVE_MAX));
- } else {
- List<Partition> loadedPartitions = new ArrayList<>();
- PerfLogger perfLogger = SessionState.getPerfLogger();
- perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.PARTITION_RETRIEVING);
- loadedPartitions.addAll(hive.getAllPartitionsOf(table));
- perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.PARTITION_RETRIEVING);
- parts = new PartitionIterable(loadedPartitions);
- }
+ PrunedPartitionList prunedPartList =
+ PartitionPruner.prune(table, null, conf, toString(), null);
+ // no partitions specified, let's get all
+ parts.addAll(prunedPartList.getPartitions());
} else {
// we're interested in specific partitions,
// don't check for any others
findUnknownPartitions = false;
- List<Partition> loadedPartitions = new ArrayList<>();
for (Map<String, String> map : partitions) {
Partition part = hive.getPartition(table, map, false);
if (part == null) {
@@ -241,13 +229,10 @@ public class HiveMetaStoreChecker {
pr.setPartitionName(Warehouse.makePartPath(map));
result.getPartitionsNotInMs().add(pr);
} else {
- loadedPartitions.add(part);
+ parts.add(part);
}
}
- parts = new PartitionIterable(loadedPartitions);
}
- } else {
- parts = new PartitionIterable(Collections.<Partition>emptyList());
}
checkTable(table, parts, findUnknownPartitions, result);
@@ -270,7 +255,7 @@ public class HiveMetaStoreChecker {
* @throws HiveException
* Could not create Partition object
*/
- void checkTable(Table table, PartitionIterable parts,
+ void checkTable(Table table, List<Partition> parts,
boolean findUnknownPartitions, CheckResult result) throws IOException,
HiveException {
@@ -299,9 +284,7 @@ public class HiveMetaStoreChecker {
}
for (int i = 0; i < partition.getSpec().size(); i++) {
- Path qualifiedPath = partPath.makeQualified(fs);
- StringInternUtils.internUriStringsInPath(qualifiedPath);
- partPaths.add(qualifiedPath);
+ partPaths.add(partPath.makeQualified(fs));
partPath = partPath.getParent();
}
}
@@ -331,7 +314,7 @@ public class HiveMetaStoreChecker {
// now check the table folder and see if we find anything
// that isn't in the metastore
Set<Path> allPartDirs = new HashSet<Path>();
- checkPartitionDirs(tablePath, allPartDirs, Collections.unmodifiableList(table.getPartColNames()));
+ checkPartitionDirs(tablePath, allPartDirs, table.getPartCols().size());
// don't want the table dir
allPartDirs.remove(tablePath);
@@ -415,14 +398,14 @@ public class HiveMetaStoreChecker {
* Start directory
* @param allDirs
* This set will contain the leaf paths at the end.
- * @param list
+ * @param maxDepth
* Specify how deep the search goes.
* @throws IOException
* Thrown if we can't get lists from the fs.
* @throws HiveException
*/
- private void checkPartitionDirs(Path basePath, Set<Path> allDirs, final List<String> partColNames) throws IOException, HiveException {
+ private void checkPartitionDirs(Path basePath, Set<Path> allDirs, int maxDepth) throws IOException, HiveException {
// Here we just reuse the THREAD_COUNT configuration for
// METASTORE_FS_HANDLER_THREADS_COUNT since this results in better performance
// The number of missing partitions discovered are later added by metastore using a
@@ -440,21 +423,21 @@ public class HiveMetaStoreChecker {
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("MSCK-GetPaths-%d").build();
executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(poolSize, threadFactory);
}
- checkPartitionDirs(executor, basePath, allDirs, basePath.getFileSystem(conf), partColNames);
+ checkPartitionDirs(executor, basePath, allDirs, basePath.getFileSystem(conf), maxDepth);
executor.shutdown();
}
private final class PathDepthInfoCallable implements Callable<Path> {
- private final List<String> partColNames;
+ private final int maxDepth;
private final FileSystem fs;
private final ConcurrentLinkedQueue<PathDepthInfo> pendingPaths;
private final boolean throwException;
private final PathDepthInfo pd;
- private PathDepthInfoCallable(PathDepthInfo pd, List<String> partColNames, FileSystem fs,
+ private PathDepthInfoCallable(PathDepthInfo pd, int maxDepth, FileSystem fs,
ConcurrentLinkedQueue<PathDepthInfo> basePaths) {
- this.partColNames = partColNames;
+ this.maxDepth = maxDepth;
this.pd = pd;
this.fs = fs;
this.pendingPaths = basePaths;
@@ -474,50 +457,39 @@ public class HiveMetaStoreChecker {
FileStatus[] fileStatuses = fs.listStatus(currentPath, FileUtils.HIDDEN_FILES_PATH_FILTER);
// found no files under a sub-directory under table base path; it is possible that the table
// is empty and hence there are no partition sub-directories created under base path
- if (fileStatuses.length == 0 && currentDepth > 0 && currentDepth < partColNames.size()) {
+ if (fileStatuses.length == 0 && currentDepth > 0 && currentDepth < maxDepth) {
// since maxDepth is not yet reached, we are missing partition
// columns in currentPath
- logOrThrowExceptionWithMsg(
- "MSCK is missing partition columns under " + currentPath.toString());
+ if (throwException) {
+ throw new HiveException(
+ "MSCK is missing partition columns under " + currentPath.toString());
+ } else {
+ LOG.warn("MSCK is missing partition columns under " + currentPath.toString());
+ }
} else {
// found files under currentPath add them to the queue if it is a directory
for (FileStatus fileStatus : fileStatuses) {
- if (!fileStatus.isDirectory() && currentDepth < partColNames.size()) {
+ if (!fileStatus.isDirectory() && currentDepth < maxDepth) {
// found a file at depth which is less than number of partition keys
- logOrThrowExceptionWithMsg(
- "MSCK finds a file rather than a directory when it searches for "
- + fileStatus.getPath().toString());
- } else if (fileStatus.isDirectory() && currentDepth < partColNames.size()) {
- // found a sub-directory at a depth less than number of partition keys
- // validate if the partition directory name matches with the corresponding
- // partition colName at currentDepth
- Path nextPath = fileStatus.getPath();
- String[] parts = nextPath.getName().split("=");
- if (parts.length != 2) {
- logOrThrowExceptionWithMsg("Invalid partition name " + nextPath);
- } else if (!parts[0].equalsIgnoreCase(partColNames.get(currentDepth))) {
- logOrThrowExceptionWithMsg(
- "Unexpected partition key " + parts[0] + " found at " + nextPath);
+ if (throwException) {
+ throw new HiveException(
+ "MSCK finds a file rather than a directory when it searches for "
+ + fileStatus.getPath().toString());
} else {
- // add sub-directory to the work queue if maxDepth is not yet reached
- pendingPaths.add(new PathDepthInfo(nextPath, currentDepth + 1));
+ LOG.warn("MSCK finds a file rather than a directory when it searches for "
+ + fileStatus.getPath().toString());
}
+ } else if (fileStatus.isDirectory() && currentDepth < maxDepth) {
+ // add sub-directory to the work queue if maxDepth is not yet reached
+ pendingPaths.add(new PathDepthInfo(fileStatus.getPath(), currentDepth + 1));
}
}
- if (currentDepth == partColNames.size()) {
+ if (currentDepth == maxDepth) {
return currentPath;
}
}
return null;
}
-
- private void logOrThrowExceptionWithMsg(String msg) throws HiveException {
- if(throwException) {
- throw new HiveException(msg);
- } else {
- LOG.warn(msg);
- }
- }
}
private static class PathDepthInfo {
@@ -531,7 +503,7 @@ public class HiveMetaStoreChecker {
private void checkPartitionDirs(final ExecutorService executor,
final Path basePath, final Set<Path> result,
- final FileSystem fs, final List<String> partColNames) throws HiveException {
+ final FileSystem fs, final int maxDepth) throws HiveException {
try {
Queue<Future<Path>> futures = new LinkedList<Future<Path>>();
ConcurrentLinkedQueue<PathDepthInfo> nextLevel = new ConcurrentLinkedQueue<>();
@@ -548,7 +520,7 @@ public class HiveMetaStoreChecker {
//process each level in parallel
while(!nextLevel.isEmpty()) {
futures.add(
- executor.submit(new PathDepthInfoCallable(nextLevel.poll(), partColNames, fs, tempQueue)));
+ executor.submit(new PathDepthInfoCallable(nextLevel.poll(), maxDepth, fs, tempQueue)));
}
while(!futures.isEmpty()) {
Path p = futures.poll().get();
http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java
index a319b88..8eb011e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java
@@ -391,7 +391,7 @@ public class SessionHiveMetaStoreClient extends HiveMetaStoreClient implements I
throw new MetaException("Temp table path not set for " + tbl.getTableName());
} else {
if (!wh.isDir(tblPath)) {
- if (!wh.mkdirs(tblPath)) {
+ if (!wh.mkdirs(tblPath, true)) {
throw new MetaException(tblPath
+ " is not a directory or unable to create one");
}
http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java
index 5efaf70..c53ddad 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java
@@ -18,19 +18,15 @@
package org.apache.hadoop.hive.ql.metadata;
-import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
-import java.util.Set;
-import com.google.common.base.Preconditions;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
@@ -508,14 +504,6 @@ public class Table implements Serializable {
return null;
}
- public List<String> getPartColNames() {
- List<String> partColNames = new ArrayList<String>();
- for (FieldSchema key : getPartCols()) {
- partColNames.add(key.getName());
- }
- return partColNames;
- }
-
public boolean isPartitionKey(String colName) {
return getPartColByName(colName) == null ? false : true;
}
@@ -948,16 +936,6 @@ public class Table implements Serializable {
}
}
- public boolean isEmpty() throws HiveException {
- Preconditions.checkNotNull(getPath());
- try {
- FileSystem fs = FileSystem.get(getPath().toUri(), SessionState.getSessionConf());
- return !fs.exists(getPath()) || fs.listStatus(getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER).length == 0;
- } catch (IOException e) {
- throw new HiveException(e);
- }
- }
-
public boolean isTemporary() {
return tTable.isTemporary();
}
@@ -986,7 +964,7 @@ public class Table implements Serializable {
public static void validateColumns(List<FieldSchema> columns, List<FieldSchema> partCols)
throws HiveException {
- Set<String> colNames = new HashSet<>();
+ List<String> colNames = new ArrayList<String>();
for (FieldSchema partCol: columns) {
String colName = normalize(partCol.getName());
if (colNames.contains(colName)) {
http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/ql/src/java/org/apache/hadoop/hive/ql/metadata/VirtualColumn.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/VirtualColumn.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/VirtualColumn.java
index 2435bf1..044d64c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/VirtualColumn.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/VirtualColumn.java
@@ -60,7 +60,7 @@ public enum VirtualColumn {
*/
GROUPINGID("GROUPING__ID", TypeInfoFactory.intTypeInfo);
- public static final ImmutableSet<String> VIRTUAL_COLUMN_NAMES =
+ public static ImmutableSet<String> VIRTUAL_COLUMN_NAMES =
ImmutableSet.of(FILENAME.getName(), BLOCKOFFSET.getName(), ROWOFFSET.getName(),
RAWDATASIZE.getName(), GROUPINGID.getName(), ROWID.getName());
http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPruner.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPruner.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPruner.java
index d59603e..7e39d77 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPruner.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPruner.java
@@ -158,15 +158,14 @@ public class ColumnPruner extends Transform {
boolean walkChildren = true;
opStack.push(nd);
- // no need to go further down for a select op with all file sink or script
- // child since all cols are needed for these ops
- // However, if one of the children is not file sink or script, we still go down.
+ // no need to go further down for a select op with a file sink or script
+ // child
+ // since all cols are needed for these ops
if (nd instanceof SelectOperator) {
- walkChildren = false;
for (Node child : nd.getChildren()) {
- if (!(child instanceof FileSinkOperator || child instanceof ScriptOperator)) {
- walkChildren = true;
- break;
+ if ((child instanceof FileSinkOperator)
+ || (child instanceof ScriptOperator)) {
+ walkChildren = false;
}
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java
index 45839ad..00ec03e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java
@@ -671,11 +671,10 @@ public final class ColumnPrunerProcFactory {
List<FieldNode> colsAfterReplacement = new ArrayList<>();
List<FieldNode> newCols = new ArrayList<>();
- for (int index = 0; index < numSelColumns; index++) {
- String colName = outputCols.get(index);
- FieldNode col = lookupColumn(cols, colName);
+ for (FieldNode col : cols) {
+ int index = outputCols.indexOf(col.getFieldName());
// colExprMap.size() == size of cols from SEL(*) branch
- if (col != null) {
+ if (index >= 0 && index < numSelColumns) {
ExprNodeDesc transformed = colExprMap.get(col.getFieldName());
colsAfterReplacement = mergeFieldNodesWithDesc(colsAfterReplacement, transformed);
newCols.add(col);
@@ -714,14 +713,12 @@ public final class ColumnPrunerProcFactory {
RowSchema rs = op.getSchema();
ArrayList<ExprNodeDesc> colList = new ArrayList<>();
List<FieldNode> outputCols = new ArrayList<>();
- for (ColumnInfo colInfo : rs.getSignature()) {
- FieldNode col = lookupColumn(cols, colInfo.getInternalName());
- if (col != null) {
- // revert output cols of SEL(*) to ExprNodeColumnDesc
- ExprNodeColumnDesc colExpr = new ExprNodeColumnDesc(colInfo);
- colList.add(colExpr);
- outputCols.add(col);
- }
+ for (FieldNode col : cols) {
+ // revert output cols of SEL(*) to ExprNodeColumnDesc
+ ColumnInfo colInfo = rs.getColumnInfo(col.getFieldName());
+ ExprNodeColumnDesc colExpr = new ExprNodeColumnDesc(colInfo);
+ colList.add(colExpr);
+ outputCols.add(col);
}
// replace SEL(*) to SEL(exprs)
((SelectDesc)select.getConf()).setSelStarNoCompute(false);
@@ -813,18 +810,11 @@ public final class ColumnPrunerProcFactory {
ArrayList<String> newOutputColumnNames = new ArrayList<String>();
ArrayList<ColumnInfo> rs_oldsignature = op.getSchema().getSignature();
ArrayList<ColumnInfo> rs_newsignature = new ArrayList<ColumnInfo>();
- // The pruning needs to preserve the order of columns in the input schema
- Set<String> colNames = new HashSet<String>();
for (FieldNode col : cols) {
- colNames.add(col.getFieldName());
- }
- for (int i = 0; i < originalOutputColumnNames.size(); i++) {
- String colName = originalOutputColumnNames.get(i);
- if (colNames.contains(colName)) {
- newOutputColumnNames.add(colName);
- newColList.add(originalColList.get(i));
- rs_newsignature.add(rs_oldsignature.get(i));
- }
+ int index = originalOutputColumnNames.indexOf(col.getFieldName());
+ newOutputColumnNames.add(col.getFieldName());
+ newColList.add(originalColList.get(index));
+ rs_newsignature.add(rs_oldsignature.get(index));
}
op.getSchema().setSignature(rs_newsignature);
conf.setColList(newColList);
http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
index d0fdb52..e68618a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
@@ -48,7 +48,6 @@ import org.apache.hadoop.hive.ql.exec.TezDummyStoreOperator;
import org.apache.hadoop.hive.ql.lib.Node;
import org.apache.hadoop.hive.ql.lib.NodeProcessor;
import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
-import org.apache.hadoop.hive.ql.optimizer.physical.LlapClusterStateForCompile;
import org.apache.hadoop.hive.ql.parse.GenTezUtils;
import org.apache.hadoop.hive.ql.parse.OptimizeTezProcContext;
import org.apache.hadoop.hive.ql.parse.ParseContext;
@@ -69,8 +68,6 @@ import org.apache.hadoop.util.ReflectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.annotations.VisibleForTesting;
-
/**
* ConvertJoinMapJoin is an optimization that replaces a common join
* (aka shuffle join) with a map join (aka broadcast or fragment replicate
@@ -98,19 +95,15 @@ public class ConvertJoinMapJoin implements NodeProcessor {
JoinOperator joinOp = (JoinOperator) nd;
long maxSize = context.conf.getLongVar(HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD);
- // adjust noconditional task size threshold for LLAP
- maxSize = getNoConditionalTaskSizeForLlap(maxSize, context.conf);
- joinOp.getConf().setNoConditionalTaskSize(maxSize);
-
TezBucketJoinProcCtx tezBucketJoinProcCtx = new TezBucketJoinProcCtx(context.conf);
if (!context.conf.getBoolVar(HiveConf.ConfVars.HIVECONVERTJOIN)) {
// we are just converting to a common merge join operator. The shuffle
// join in map-reduce case.
- Object retval = checkAndConvertSMBJoin(context, joinOp, tezBucketJoinProcCtx, maxSize);
+ Object retval = checkAndConvertSMBJoin(context, joinOp, tezBucketJoinProcCtx);
if (retval == null) {
return retval;
} else {
- fallbackToReduceSideJoin(joinOp, context, maxSize);
+ fallbackToReduceSideJoin(joinOp, context);
return null;
}
}
@@ -127,13 +120,13 @@ public class ConvertJoinMapJoin implements NodeProcessor {
LOG.info("Estimated number of buckets " + numBuckets);
int mapJoinConversionPos = getMapJoinConversionPos(joinOp, context, numBuckets, false, maxSize, true);
if (mapJoinConversionPos < 0) {
- Object retval = checkAndConvertSMBJoin(context, joinOp, tezBucketJoinProcCtx, maxSize);
+ Object retval = checkAndConvertSMBJoin(context, joinOp, tezBucketJoinProcCtx);
if (retval == null) {
return retval;
} else {
// only case is full outer join with SMB enabled which is not possible. Convert to regular
// join.
- fallbackToReduceSideJoin(joinOp, context, maxSize);
+ fallbackToReduceSideJoin(joinOp, context);
return null;
}
}
@@ -154,7 +147,7 @@ public class ConvertJoinMapJoin implements NodeProcessor {
if (mapJoinConversionPos < 0) {
// we are just converting to a common merge join operator. The shuffle
// join in map-reduce case.
- fallbackToReduceSideJoin(joinOp, context, maxSize);
+ fallbackToReduceSideJoin(joinOp, context);
return null;
}
@@ -171,54 +164,15 @@ public class ConvertJoinMapJoin implements NodeProcessor {
return null;
}
- @VisibleForTesting
- public long getNoConditionalTaskSizeForLlap(final long maxSize, final HiveConf conf) {
- if ("llap".equalsIgnoreCase(conf.getVar(ConfVars.HIVE_EXECUTION_MODE))) {
- LlapClusterStateForCompile llapInfo = LlapClusterStateForCompile.getClusterInfo(conf);
- llapInfo.initClusterInfo();
- final int executorsPerNode;
- if (!llapInfo.hasClusterInfo()) {
- LOG.warn("LLAP cluster information not available. Falling back to getting #executors from hiveconf..");
- executorsPerNode = conf.getIntVar(ConfVars.LLAP_DAEMON_NUM_EXECUTORS);
- } else {
- final int numExecutorsPerNodeFromCluster = llapInfo.getNumExecutorsPerNode();
- if (numExecutorsPerNodeFromCluster == -1) {
- LOG.warn("Cannot determine executor count from LLAP cluster information. Falling back to getting #executors" +
- " from hiveconf..");
- executorsPerNode = conf.getIntVar(ConfVars.LLAP_DAEMON_NUM_EXECUTORS);
- } else {
- executorsPerNode = numExecutorsPerNodeFromCluster;
- }
- }
- final int numSessions = conf.getIntVar(ConfVars.HIVE_SERVER2_TEZ_SESSIONS_PER_DEFAULT_QUEUE);
- if (numSessions > 0) {
- final int availableSlotsPerQuery = (int) ((double) executorsPerNode / numSessions);
- final double overSubscriptionFactor = conf.getFloatVar(ConfVars.LLAP_MAPJOIN_MEMORY_OVERSUBSCRIBE_FACTOR);
- final int maxSlotsPerQuery = conf.getIntVar(ConfVars.LLAP_MEMORY_OVERSUBSCRIPTION_MAX_EXECUTORS_PER_QUERY);
- final int slotsPerQuery = Math.min(maxSlotsPerQuery, availableSlotsPerQuery);
- final long llapMaxSize = (long) (maxSize + (maxSize * overSubscriptionFactor * slotsPerQuery));
- LOG.info("No conditional task size adjusted for LLAP. executorsPerNode: {}, numSessions: {}, " +
- "availableSlotsPerQuery: {}, overSubscriptionFactor: {}, maxSlotsPerQuery: {}, slotsPerQuery: {}, " +
- "noconditionalTaskSize: {}, adjustedNoconditionalTaskSize: {}", executorsPerNode, numSessions,
- availableSlotsPerQuery, overSubscriptionFactor, maxSlotsPerQuery, slotsPerQuery, maxSize, llapMaxSize);
- return Math.max(maxSize, llapMaxSize);
- } else {
- LOG.warn(ConfVars.HIVE_SERVER2_TEZ_SESSIONS_PER_DEFAULT_QUEUE.varname + " returned value {}. Returning {}" +
- " as no conditional task size for LLAP.", numSessions, maxSize);
- }
- }
- return maxSize;
- }
-
@SuppressWarnings("unchecked")
private Object checkAndConvertSMBJoin(OptimizeTezProcContext context, JoinOperator joinOp,
- TezBucketJoinProcCtx tezBucketJoinProcCtx, final long maxSize) throws SemanticException {
+ TezBucketJoinProcCtx tezBucketJoinProcCtx) throws SemanticException {
// we cannot convert to bucket map join, we cannot convert to
// map join either based on the size. Check if we can convert to SMB join.
if ((HiveConf.getBoolVar(context.conf, ConfVars.HIVE_AUTO_SORTMERGE_JOIN) == false)
|| ((!HiveConf.getBoolVar(context.conf, ConfVars.HIVE_AUTO_SORTMERGE_JOIN_REDUCE))
&& joinOp.getOpTraits().getNumReduceSinks() >= 2)) {
- fallbackToReduceSideJoin(joinOp, context, maxSize);
+ fallbackToReduceSideJoin(joinOp, context);
return null;
}
Class<? extends BigTableSelectorForAutoSMJ> bigTableMatcherClass = null;
@@ -247,7 +201,7 @@ public class ConvertJoinMapJoin implements NodeProcessor {
// contains aliases from sub-query
// we are just converting to a common merge join operator. The shuffle
// join in map-reduce case.
- fallbackToReduceSideJoin(joinOp, context, maxSize);
+ fallbackToReduceSideJoin(joinOp, context);
return null;
}
@@ -257,7 +211,7 @@ public class ConvertJoinMapJoin implements NodeProcessor {
} else {
// we are just converting to a common merge join operator. The shuffle
// join in map-reduce case.
- fallbackToReduceSideJoin(joinOp, context, maxSize);
+ fallbackToReduceSideJoin(joinOp, context);
}
return null;
}
@@ -281,7 +235,7 @@ public class ConvertJoinMapJoin implements NodeProcessor {
joinOp.getConf().getBaseSrc(), joinOp).getSecond(),
null, joinDesc.getExprs(), null, null,
joinDesc.getOutputColumnNames(), mapJoinConversionPos, joinDesc.getConds(),
- joinDesc.getFilters(), joinDesc.getNoOuterJoin(), null, joinDesc.getNoConditionalTaskSize());
+ joinDesc.getFilters(), joinDesc.getNoOuterJoin(), null);
mapJoinDesc.setNullSafes(joinDesc.getNullSafes());
mapJoinDesc.setFilterMap(joinDesc.getFilterMap());
mapJoinDesc.setResidualFilterExprs(joinDesc.getResidualFilterExprs());
@@ -840,7 +794,7 @@ public class ConvertJoinMapJoin implements NodeProcessor {
// The semijoin branch can potentially create a task level cycle
// with the hashjoin except when it is dynamically partitioned hash
// join which takes place in a separate task.
- if (context.parseContext.getRsToSemiJoinBranchInfo().size() > 0
+ if (context.parseContext.getRsOpToTsOpMap().size() > 0
&& removeReduceSink) {
removeCycleCreatingSemiJoinOps(mapJoinOp, parentSelectOpOfBigTableOp,
context.parseContext);
@@ -872,7 +826,7 @@ public class ConvertJoinMapJoin implements NodeProcessor {
}
ReduceSinkOperator rs = (ReduceSinkOperator) op;
- TableScanOperator ts = parseContext.getRsToSemiJoinBranchInfo().get(rs).getTsOp();
+ TableScanOperator ts = parseContext.getRsOpToTsOpMap().get(rs);
if (ts == null) {
// skip, no semijoin branch
continue;
@@ -897,11 +851,6 @@ public class ConvertJoinMapJoin implements NodeProcessor {
}
if (semiJoinMap.size() > 0) {
for (ReduceSinkOperator rs : semiJoinMap.keySet()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Found semijoin optimization from the big table side of a map join, which will cause a task cycle. "
- + "Removing semijoin "
- + OperatorUtils.getOpNamePretty(rs) + " - " + OperatorUtils.getOpNamePretty(semiJoinMap.get(rs)));
- }
GenTezUtils.removeBranch(rs);
GenTezUtils.removeSemiJoinOperator(parseContext, rs,
semiJoinMap.get(rs));
@@ -974,14 +923,15 @@ public class ConvertJoinMapJoin implements NodeProcessor {
return numBuckets;
}
- private boolean convertJoinDynamicPartitionedHashJoin(JoinOperator joinOp, OptimizeTezProcContext context,
- final long maxSize)
+ private boolean convertJoinDynamicPartitionedHashJoin(JoinOperator joinOp, OptimizeTezProcContext context)
throws SemanticException {
// Attempt dynamic partitioned hash join
// Since we don't have big table index yet, must start with estimate of numReducers
int numReducers = estimateNumBuckets(joinOp, false);
LOG.info("Try dynamic partitioned hash join with estimated " + numReducers + " reducers");
- int bigTablePos = getMapJoinConversionPos(joinOp, context, numReducers, false, maxSize,false);
+ int bigTablePos = getMapJoinConversionPos(joinOp, context, numReducers, false,
+ context.conf.getLongVar(HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD),
+ false);
if (bigTablePos >= 0) {
// Now that we have the big table index, get real numReducers value based on big table RS
ReduceSinkOperator bigTableParentRS =
@@ -1016,11 +966,11 @@ public class ConvertJoinMapJoin implements NodeProcessor {
return false;
}
- private void fallbackToReduceSideJoin(JoinOperator joinOp, OptimizeTezProcContext context, final long maxSize)
+ private void fallbackToReduceSideJoin(JoinOperator joinOp, OptimizeTezProcContext context)
throws SemanticException {
if (context.conf.getBoolVar(HiveConf.ConfVars.HIVECONVERTJOIN) &&
context.conf.getBoolVar(HiveConf.ConfVars.HIVEDYNAMICPARTITIONHASHJOIN)) {
- if (convertJoinDynamicPartitionedHashJoin(joinOp, context, maxSize)) {
+ if (convertJoinDynamicPartitionedHashJoin(joinOp, context)) {
return;
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java
index b8c0102..b6db6aa 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java
@@ -19,7 +19,6 @@
package org.apache.hadoop.hive.ql.optimizer;
import java.util.ArrayList;
-import java.util.EnumSet;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
@@ -43,7 +42,12 @@ import org.apache.hadoop.hive.ql.lib.RuleRegExp;
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.optimizer.spark.SparkPartitionPruningSinkDesc;
-import org.apache.hadoop.hive.ql.parse.*;
+import org.apache.hadoop.hive.ql.parse.OptimizeTezProcContext;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.PrunedPartitionList;
+import org.apache.hadoop.hive.ql.parse.RuntimeValuesInfo;
+import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.parse.spark.OptimizeSparkProcContext;
import org.apache.hadoop.hive.ql.plan.*;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFBloomFilter.GenericUDAFBloomFilterEvaluator;
@@ -210,34 +214,16 @@ public class DynamicPartitionPruningOptimization implements NodeProcessor {
} else {
LOG.debug("Column " + column + " is not a partition column");
if (semiJoin && ts.getConf().getFilterExpr() != null) {
- LOG.debug("Initiate semijoin reduction for " + column + " ("
- + ts.getConf().getFilterExpr().getExprString());
- // Get the table name from which the min-max values and bloom filter will come.
+ LOG.debug("Initiate semijoin reduction for " + column);
+ // Get the table name from which the min-max values will come.
Operator<?> op = ctx.generator;
-
while (!(op == null || op instanceof TableScanOperator)) {
op = op.getParentOperators().get(0);
}
String tableAlias = (op == null ? "" : ((TableScanOperator) op).getConf().getAlias());
-
keyBaseAlias = ctx.generator.getOperatorId() + "_" + tableAlias + "_" + column;
- Map<String, SemiJoinHint> hints = parseContext.getSemiJoinHints();
- if (hints != null) {
- // If hints map has no entry that would imply that user enforced
- // no runtime filtering.
- if (hints.size() > 0) {
- SemiJoinHint sjHint = hints.get(tableAlias);
- semiJoinAttempted = generateSemiJoinOperatorPlan(
- ctx, parseContext, ts, keyBaseAlias, sjHint);
- if (!semiJoinAttempted && sjHint != null) {
- throw new SemanticException("The user hint to enforce semijoin failed required conditions");
- }
- }
- } else {
- semiJoinAttempted = generateSemiJoinOperatorPlan(
- ctx, parseContext, ts, keyBaseAlias, null);
- }
+ semiJoinAttempted = generateSemiJoinOperatorPlan(ctx, parseContext, ts, keyBaseAlias);
}
}
@@ -400,13 +386,7 @@ public class DynamicPartitionPruningOptimization implements NodeProcessor {
// Generates plan for min/max when dynamic partition pruning is ruled out.
private boolean generateSemiJoinOperatorPlan(DynamicListContext ctx, ParseContext parseContext,
- TableScanOperator ts, String keyBaseAlias, SemiJoinHint sjHint) throws SemanticException {
-
- // If semijoin hint is enforced, make sure hint is provided
- if (parseContext.getConf().getBoolVar(ConfVars.TEZ_DYNAMIC_SEMIJOIN_REDUCTION_HINT_ONLY)
- && sjHint == null) {
- return false;
- }
+ TableScanOperator ts, String keyBaseAlias) throws SemanticException {
// we will put a fork in the plan at the source of the reduce sink
Operator<? extends OperatorDesc> parentOfRS = ctx.generator.getParentOperators().get(0);
@@ -422,62 +402,33 @@ public class DynamicPartitionPruningOptimization implements NodeProcessor {
exprNodeDesc = exprNodeDesc.getChildren().get(0);
}
- if (!(exprNodeDesc instanceof ExprNodeColumnDesc)) {
- // No column found!
- // Bail out
- return false;
- }
-
- internalColName = ((ExprNodeColumnDesc) exprNodeDesc).getColumn();
- if (parentOfRS instanceof SelectOperator) {
- // Make sure the semijoin branch is not on partition column.
- ExprNodeDesc expr = parentOfRS.getColumnExprMap().get(internalColName);
- while (!(expr instanceof ExprNodeColumnDesc) &&
- (expr.getChildren() != null)) {
- expr = expr.getChildren().get(0);
- }
-
- if (!(expr instanceof ExprNodeColumnDesc)) {
- // No column found!
- // Bail out
- return false;
- }
-
- ExprNodeColumnDesc colExpr = (ExprNodeColumnDesc) expr;
- String colName = ExprNodeDescUtils.extractColName(colExpr);
-
- // Fetch the TableScan Operator.
- Operator<?> op = parentOfRS.getParentOperators().get(0);
- while (op != null && !(op instanceof TableScanOperator)) {
- op = op.getParentOperators().get(0);
- }
- assert op != null;
+ if (exprNodeDesc instanceof ExprNodeColumnDesc) {
+ internalColName = ((ExprNodeColumnDesc) exprNodeDesc).getColumn();
+ if (parentOfRS instanceof SelectOperator) {
+ // Make sure the semijoin branch is not on parition column.
+ ExprNodeColumnDesc colExpr = ((ExprNodeColumnDesc) (parentOfRS.
+ getColumnExprMap().get(internalColName)));
+ String colName = ExprNodeDescUtils.extractColName(colExpr);
+
+ // Fetch the TableScan Operator.
+ Operator<?> op = parentOfRS.getParentOperators().get(0);
+ while (op != null && !(op instanceof TableScanOperator)) {
+ op = op.getParentOperators().get(0);
+ }
+ assert op != null;
- Table table = ((TableScanOperator) op).getConf().getTableMetadata();
- if (table.isPartitionKey(colName)) {
- // The column is partition column, skip the optimization.
- return false;
+ Table table = ((TableScanOperator) op).getConf().getTableMetadata();
+ if (table.isPartitionKey(colName)) {
+ // The column is partition column, skip the optimization.
+ return false;
+ }
}
- }
-
- // If hint is provided and only hinted semijoin optimizations should be
- // created, then skip other columns on the table
- if (parseContext.getConf().getBoolVar(ConfVars.TEZ_DYNAMIC_SEMIJOIN_REDUCTION_HINT_ONLY)
- && sjHint.getColName() != null &&
- !internalColName.equals(sjHint.getColName())) {
+ } else {
+ // No column found!
+ // Bail out
return false;
}
- // Check if there already exists a semijoin branch
- GroupByOperator gb = parseContext.getColExprToGBMap().get(key);
- if (gb != null) {
- // Already an existing semijoin branch, reuse it
- createFinalRsForSemiJoinOp(parseContext, ts, gb, key, keyBaseAlias,
- ctx.parent.getChildren().get(0), sjHint != null);
- // done!
- return true;
- }
-
List<ExprNodeDesc> keyExprs = new ArrayList<ExprNodeDesc>();
keyExprs.add(key);
@@ -521,6 +472,8 @@ public class DynamicPartitionPruningOptimization implements NodeProcessor {
HiveConf.getFloatVar(parseContext.getConf(),
HiveConf.ConfVars.HIVEMAPAGGRMEMORYTHRESHOLD);
+ ArrayList<ExprNodeDesc> groupByExprs = new ArrayList<ExprNodeDesc>();
+
// Add min/max and bloom filter aggregations
List<ObjectInspector> aggFnOIs = new ArrayList<ObjectInspector>();
aggFnOIs.add(key.getWritableObjectInspector());
@@ -540,17 +493,9 @@ public class DynamicPartitionPruningOptimization implements NodeProcessor {
AggregationDesc bloomFilter = new AggregationDesc("bloom_filter",
FunctionRegistry.getGenericUDAFEvaluator("bloom_filter", aggFnOIs, false, false),
params, false, Mode.PARTIAL1);
- GenericUDAFBloomFilterEvaluator bloomFilterEval =
- (GenericUDAFBloomFilterEvaluator) bloomFilter.getGenericUDAFEvaluator();
+ GenericUDAFBloomFilterEvaluator bloomFilterEval = (GenericUDAFBloomFilterEvaluator) bloomFilter.getGenericUDAFEvaluator();
bloomFilterEval.setSourceOperator(selectOp);
-
- if (sjHint != null && sjHint.getNumEntries() > 0) {
- LOG.debug("Setting size for " + keyBaseAlias + " to " + sjHint.getNumEntries() + " based on the hint");
- bloomFilterEval.setHintEntries(sjHint.getNumEntries());
- }
bloomFilterEval.setMaxEntries(parseContext.getConf().getLongVar(ConfVars.TEZ_MAX_BLOOM_FILTER_ENTRIES));
- bloomFilterEval.setMinEntries(parseContext.getConf().getLongVar(ConfVars.TEZ_MIN_BLOOM_FILTER_ENTRIES));
- bloomFilterEval.setFactor(parseContext.getConf().getFloatVar(ConfVars.TEZ_BLOOM_FILTER_FACTOR));
bloomFilter.setGenericUDAFWritableEvaluator(bloomFilterEval);
aggs.add(min);
aggs.add(max);
@@ -602,8 +547,6 @@ public class DynamicPartitionPruningOptimization implements NodeProcessor {
Map<String, ExprNodeDesc> columnExprMap = new HashMap<String, ExprNodeDesc>();
rsOp.setColumnExprMap(columnExprMap);
- rsOp.getConf().setReducerTraits(EnumSet.of(ReduceSinkDesc.ReducerTraits.QUICKSTART));
-
// Create the final Group By Operator
ArrayList<AggregationDesc> aggsFinal = new ArrayList<AggregationDesc>();
try {
@@ -648,12 +591,7 @@ public class DynamicPartitionPruningOptimization implements NodeProcessor {
bloomFilterFinalParams, false, Mode.FINAL);
GenericUDAFBloomFilterEvaluator bloomFilterEval = (GenericUDAFBloomFilterEvaluator) bloomFilter.getGenericUDAFEvaluator();
bloomFilterEval.setSourceOperator(selectOp);
- if (sjHint != null && sjHint.getNumEntries() > 0) {
- bloomFilterEval.setHintEntries(sjHint.getNumEntries());
- }
bloomFilterEval.setMaxEntries(parseContext.getConf().getLongVar(ConfVars.TEZ_MAX_BLOOM_FILTER_ENTRIES));
- bloomFilterEval.setMinEntries(parseContext.getConf().getLongVar(ConfVars.TEZ_MIN_BLOOM_FILTER_ENTRIES));
- bloomFilterEval.setFactor(parseContext.getConf().getFloatVar(ConfVars.TEZ_BLOOM_FILTER_FACTOR));
bloomFilter.setGenericUDAFWritableEvaluator(bloomFilterEval);
aggsFinal.add(min);
@@ -679,56 +617,23 @@ public class DynamicPartitionPruningOptimization implements NodeProcessor {
rsOp.getConf().setOutputOperators(outputOperators);
}
- createFinalRsForSemiJoinOp(parseContext, ts, groupByOpFinal, key,
- keyBaseAlias, ctx.parent.getChildren().get(0), sjHint != null);
-
- return true;
- }
-
- private void createFinalRsForSemiJoinOp(
- ParseContext parseContext, TableScanOperator ts, GroupByOperator gb,
- ExprNodeDesc key, String keyBaseAlias, ExprNodeDesc colExpr,
- boolean isHint) throws SemanticException {
- ArrayList<String> gbOutputNames = new ArrayList<>();
- // One each for min, max and bloom filter
- gbOutputNames.add(SemanticAnalyzer.getColumnInternalName(0));
- gbOutputNames.add(SemanticAnalyzer.getColumnInternalName(1));
- gbOutputNames.add(SemanticAnalyzer.getColumnInternalName(2));
-
- int colPos = 0;
- ArrayList<ExprNodeDesc> rsValueCols = new ArrayList<ExprNodeDesc>();
- for (int i = 0; i < gbOutputNames.size() - 1; i++) {
- ExprNodeColumnDesc expr = new ExprNodeColumnDesc(key.getTypeInfo(),
- gbOutputNames.get(colPos++), "", false);
- rsValueCols.add(expr);
- }
-
- // Bloom Filter uses binary
- ExprNodeColumnDesc colBFExpr = new ExprNodeColumnDesc(TypeInfoFactory.binaryTypeInfo,
- gbOutputNames.get(colPos++), "", false);
- rsValueCols.add(colBFExpr);
-
// Create the final Reduce Sink Operator
ReduceSinkDesc rsDescFinal = PlanUtils.getReduceSinkDesc(
new ArrayList<ExprNodeDesc>(), rsValueCols, gbOutputNames, false,
-1, 0, 1, Operation.NOT_ACID);
ReduceSinkOperator rsOpFinal = (ReduceSinkOperator)OperatorFactory.getAndMakeChild(
- rsDescFinal, new RowSchema(gb.getSchema()), gb);
- Map<String, ExprNodeDesc> columnExprMap = new HashMap<>();
+ rsDescFinal, new RowSchema(groupByOpFinal.getSchema()), groupByOpFinal);
rsOpFinal.setColumnExprMap(columnExprMap);
- LOG.debug("DynamicSemiJoinPushdown: Saving RS to TS mapping: " + rsOpFinal + ": " + ts);
- SemiJoinBranchInfo sjInfo = new SemiJoinBranchInfo(ts, isHint);
- parseContext.getRsToSemiJoinBranchInfo().put(rsOpFinal, sjInfo);
+ LOG.debug("DynamicMinMaxPushdown: Saving RS to TS mapping: " + rsOpFinal + ": " + ts);
+ parseContext.getRsOpToTsOpMap().put(rsOpFinal, ts);
// for explain purpose
- if (parseContext.getContext().getExplainConfig() != null &&
- parseContext.getContext().getExplainConfig().isFormatted()) {
- List<String> outputOperators = rsOpFinal.getConf().getOutputOperators();
- if (outputOperators == null) {
- outputOperators = new ArrayList<>();
- }
+ if (parseContext.getContext().getExplainConfig() != null
+ && parseContext.getContext().getExplainConfig().isFormatted()) {
+ List<String> outputOperators = new ArrayList<>();
outputOperators.add(ts.getOperatorId());
+ rsOpFinal.getConf().setOutputOperators(outputOperators);
}
// Save the info that is required at query time to resolve dynamic/runtime values.
@@ -743,9 +648,9 @@ public class DynamicPartitionPruningOptimization implements NodeProcessor {
runtimeValuesInfo.setTableDesc(rsFinalTableDesc);
runtimeValuesInfo.setDynamicValueIDs(dynamicValueIDs);
runtimeValuesInfo.setColExprs(rsValueCols);
- runtimeValuesInfo.setTsColExpr(colExpr);
parseContext.getRsToRuntimeValuesInfoMap().put(rsOpFinal, runtimeValuesInfo);
- parseContext.getColExprToGBMap().put(key, gb);
+
+ return true;
}
private Map<Node, Object> collectDynamicPruningConditions(ExprNodeDesc pred, NodeProcessorCtx ctx)