You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hcatalog-commits@incubator.apache.org by to...@apache.org on 2012/11/10 01:13:20 UTC
svn commit: r1407712 - in /incubator/hcatalog/trunk: ./
shims/src/20/java/org/apache/hcatalog/shims/
shims/src/23/java/org/apache/hcatalog/shims/
src/java/org/apache/hcatalog/mapreduce/ src/java/org/apache/hcatalog/shims/
Author: toffer
Date: Sat Nov 10 01:13:19 2012
New Revision: 1407712
URL: http://svn.apache.org/viewvc?rev=1407712&view=rev
Log:
HCATALOG-538 HCatalogStorer fails for 100GB of data with dynamic partitioning, number of partition is 300 (amalakar via toffer)
Modified:
incubator/hcatalog/trunk/CHANGES.txt
incubator/hcatalog/trunk/shims/src/20/java/org/apache/hcatalog/shims/HCatHadoopShims20S.java
incubator/hcatalog/trunk/shims/src/23/java/org/apache/hcatalog/shims/HCatHadoopShims23.java
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/shims/HCatHadoopShims.java
Modified: incubator/hcatalog/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/CHANGES.txt?rev=1407712&r1=1407711&r2=1407712&view=diff
==============================================================================
--- incubator/hcatalog/trunk/CHANGES.txt (original)
+++ incubator/hcatalog/trunk/CHANGES.txt Sat Nov 10 01:13:19 2012
@@ -44,6 +44,8 @@ Trunk (unreleased changes)
HCAT-427 Document storage-based authorization (lefty via gates)
IMPROVEMENTS
+ HCAT-538 HCatalogStorer fails for 100GB of data with dynamic partitioning, number of partition is 300 (amalakar via toffer)
+
HCAT-532 HiveClientCache shutdown hook should log at debug level (traviscrawford)
HCAT-528 ivysettings.xml does not let you override .m2/repository or ivy.cache.dir setting (raja@cmbasics.com via traviscrawford)
Modified: incubator/hcatalog/trunk/shims/src/20/java/org/apache/hcatalog/shims/HCatHadoopShims20S.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/shims/src/20/java/org/apache/hcatalog/shims/HCatHadoopShims20S.java?rev=1407712&r1=1407711&r2=1407712&view=diff
==============================================================================
--- incubator/hcatalog/trunk/shims/src/20/java/org/apache/hcatalog/shims/HCatHadoopShims20S.java (original)
+++ incubator/hcatalog/trunk/shims/src/20/java/org/apache/hcatalog/shims/HCatHadoopShims20S.java Sat Nov 10 01:13:19 2012
@@ -23,6 +23,8 @@ import java.net.InetSocketAddress;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobTracker;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
@@ -140,4 +142,10 @@ public class HCatHadoopShims20S implemen
return "";
}
+
+ @Override
+ public boolean isFileInHDFS(FileSystem fs, Path path) throws IOException {
+ // In hadoop 1.x.x the file system URI is sufficient to determine the uri of the file
+ return "hdfs".equals(fs.getUri().getScheme());
+ }
}
Modified: incubator/hcatalog/trunk/shims/src/23/java/org/apache/hcatalog/shims/HCatHadoopShims23.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/shims/src/23/java/org/apache/hcatalog/shims/HCatHadoopShims23.java?rev=1407712&r1=1407711&r2=1407712&view=diff
==============================================================================
--- incubator/hcatalog/trunk/shims/src/23/java/org/apache/hcatalog/shims/HCatHadoopShims23.java (original)
+++ incubator/hcatalog/trunk/shims/src/23/java/org/apache/hcatalog/shims/HCatHadoopShims23.java Sat Nov 10 01:13:19 2012
@@ -32,6 +32,9 @@ import org.apache.hadoop.mapreduce.TaskI
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.task.JobContextImpl;
import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
+
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.net.NetUtils;
@@ -116,4 +119,11 @@ public class HCatHadoopShims23 implement
return "";
}
+
+ @Override
+ public boolean isFileInHDFS(FileSystem fs, Path path) throws IOException {
+ // In case of viewfs we need to lookup where the actual file is to know the filesystem in use.
+ // resolvePath is a sure shot way of knowing which file system the file is.
+ return "hdfs".equals(fs.resolvePath(path).toUri().getScheme());
+ }
}
Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java?rev=1407712&r1=1407711&r2=1407712&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java Sat Nov 10 01:13:19 2012
@@ -47,7 +47,6 @@ import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobStatus.State;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.security.AccessControlException;
import org.apache.hcatalog.common.ErrorType;
import org.apache.hcatalog.common.HCatConstants;
import org.apache.hcatalog.common.HCatException;
@@ -161,8 +160,7 @@ class FileOutputCommitterContainer exten
Path src;
OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(jobContext);
if (dynamicPartitioningUsed) {
- src = new Path(getPartitionRootLocation(jobInfo.getLocation()
- .toString(), jobInfo.getTableInfo().getTable()
+ src = new Path(getPartitionRootLocation(jobInfo.getLocation(), jobInfo.getTableInfo().getTable()
.getPartitionKeysSize()));
} else {
src = new Path(jobInfo.getLocation());
@@ -205,18 +203,16 @@ class FileOutputCommitterContainer exten
OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(jobContext);
if (getOutputDirMarking(jobContext.getConfiguration())) {
Path outputPath = new Path(jobInfo.getLocation());
- if (outputPath != null) {
- FileSystem fileSys = outputPath.getFileSystem(jobContext
- .getConfiguration());
- // create a file in the folder to mark it
- if (fileSys.exists(outputPath)) {
- Path filePath = new Path(outputPath,
- SUCCEEDED_FILE_NAME);
- if (!fileSys.exists(filePath)) { // may have been
- // created by
- // baseCommitter.commitJob()
- fileSys.create(filePath).close();
- }
+ FileSystem fileSys = outputPath.getFileSystem(jobContext
+ .getConfiguration());
+ // create a file in the folder to mark it
+ if (fileSys.exists(outputPath)) {
+ Path filePath = new Path(outputPath,
+ SUCCEEDED_FILE_NAME);
+ if (!fileSys.exists(filePath)) { // may have been
+ // created by
+ // baseCommitter.commitJob()
+ fileSys.create(filePath).close();
}
}
}
@@ -303,7 +299,10 @@ class FileOutputCommitterContainer exten
}
// Apply the group and permissions to the leaf partition and files.
- applyGroupAndPerms(fs, partPath, perms, grpName, true);
+ // Need not bother in case of HDFS as permission is taken care of by setting UMask
+ if (!HCatHadoopShims.Instance.get().isFileInHDFS(fs, partPath)) {
+ applyGroupAndPerms(fs, partPath, perms, grpName, true);
+ }
// Set the location in the StorageDescriptor
if (dynamicPartitioningUsed) {
@@ -325,22 +324,12 @@ class FileOutputCommitterContainer exten
String group, boolean recursive)
throws IOException {
fs.setPermission(dir, permission);
- try {
- fs.setOwner(dir, null, group);
- } catch (AccessControlException ace) {
- LOG.warn("Error changing group of " + dir, ace);
- }
if (recursive) {
for (FileStatus fileStatus : fs.listStatus(dir)) {
if (fileStatus.isDir()) {
- applyGroupAndPerms(fs, fileStatus.getPath(), permission, group, recursive);
+ applyGroupAndPerms(fs, fileStatus.getPath(), permission, group, true);
} else {
fs.setPermission(fileStatus.getPath(), permission);
- try {
- fs.setOwner(dir, null, group);
- } catch (AccessControlException ace) {
- LOG.warn("Error changing group of " + dir, ace);
- }
}
}
}
@@ -387,6 +376,7 @@ class FileOutputCommitterContainer exten
private void updateTableSchema(HiveMetaStoreClient client, Table table,
HCatSchema partitionSchema) throws IOException, InvalidOperationException, MetaException, TException {
+
List<FieldSchema> newColumns = HCatUtil.validatePartitionSchema(table, partitionSchema);
if (newColumns.size() != 0) {
@@ -403,46 +393,88 @@ class FileOutputCommitterContainer exten
* Move all of the files from the temp directory to the final location
* @param fs the output file system
* @param file the file to move
- * @param src the source directory
- * @param dest the target directory
+ * @param srcDir the source directory
+ * @param destDir the target directory
* @param dryRun - a flag that simply tests if this move would succeed or not based
* on whether other files exist where we're trying to copy
* @throws java.io.IOException
*/
private void moveTaskOutputs(FileSystem fs,
Path file,
- Path src,
- Path dest, boolean dryRun) throws IOException {
- if (fs.isFile(file)) {
- Path finalOutputPath = getFinalPath(file, src, dest);
+ Path srcDir,
+ Path destDir, final boolean dryRun) throws IOException {
- if (dryRun) {
-// LOG.info("Testing if moving ["+file+"] to ["+finalOutputPath+"] would cause a problem");
+ final Path finalOutputPath = getFinalPath(file, srcDir, destDir);
+ if (fs.isFile(file)) {
+ if (dryRun){
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Testing if moving file: [" + file + "] to ["
+ + finalOutputPath + "] would cause a problem");
+ }
if (fs.exists(finalOutputPath)) {
- throw new HCatException(ErrorType.ERROR_MOVE_FAILED, "Data already exists in " + finalOutputPath + ", duplicate publish possible.");
+ throw new HCatException(ErrorType.ERROR_MOVE_FAILED, "Data already exists in " + finalOutputPath
+ + ", duplicate publish not possible.");
}
} else {
-// LOG.info("Moving ["+file+"] to ["+finalOutputPath+"]");
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Moving file: [ " + file + "] to [" + finalOutputPath + "]");
+ }
if (!fs.rename(file, finalOutputPath)) {
if (!fs.delete(finalOutputPath, true)) {
throw new HCatException(ErrorType.ERROR_MOVE_FAILED, "Failed to delete existing path " + finalOutputPath);
}
if (!fs.rename(file, finalOutputPath)) {
- throw new HCatException(ErrorType.ERROR_MOVE_FAILED, "Failed to move output to " + dest);
+ throw new HCatException(ErrorType.ERROR_MOVE_FAILED, "Failed to move output to " + finalOutputPath);
}
}
}
- } else if (fs.getFileStatus(file).isDir()) {
- FileStatus[] paths = fs.listStatus(file);
- Path finalOutputPath = getFinalPath(file, src, dest);
- if (!dryRun) {
- fs.mkdirs(finalOutputPath);
- }
- if (paths != null) {
- for (FileStatus path : paths) {
- moveTaskOutputs(fs, path.getPath(), src, dest, dryRun);
+ } else if(fs.getFileStatus(file).isDir()) {
+ FileStatus[] children = fs.listStatus(file);
+ if (children != null && children.length > 0) {
+ FileStatus firstChild = children[0];
+ if(firstChild.isDir()) {
+ // If the first child is directory, then rest would be directory too according to HCatalog dir structure
+ // recurse in that case
+ for (FileStatus child : children) {
+ moveTaskOutputs(fs, child.getPath(), srcDir, destDir, dryRun);
+ }
+ } else {
+
+
+ if (!dryRun) {
+ if (dynamicPartitioningUsed) {
+ // Optimization: if the first child is file, we have reached the leaf directory, move the parent directory itself
+ // instead of moving each file under the directory. See HCATALOG-538
+
+ final Path parentDir = finalOutputPath.getParent();
+ // Create the directory
+ fs.mkdirs(parentDir);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Moving directory: " + file + " to " + parentDir);
+ }
+ if (!fs.rename(file, parentDir)) {
+ final String msg = "Failed to move file: " + file + " to " + parentDir;
+ LOG.error(msg);
+ throw new HCatException(ErrorType.ERROR_MOVE_FAILED, msg);
+ }
+ } else {
+ // In case of no partition we have to move each file
+ for (FileStatus child : children) {
+ moveTaskOutputs(fs, child.getPath(), srcDir, destDir, dryRun);
+ }
+ }
+ } else {
+ if(fs.exists(finalOutputPath)) {
+ throw new HCatException(ErrorType.ERROR_MOVE_FAILED, "Data already exists in " + finalOutputPath
+ + ", duplicate publish not possible.");
+ }
+ }
}
}
+ } else {
+ // Should never happen
+ final String msg = "Unknown file type being asked to be moved, erroring out";
+ throw new HCatException(ErrorType.ERROR_MOVE_FAILED, msg);
}
}
@@ -603,12 +635,12 @@ class FileOutputCommitterContainer exten
Path src = new Path(ptnRootLocation);
// check here for each dir we're copying out, to see if it
// already exists, error out if so
- moveTaskOutputs(fs, src, src, tblPath,true);
- moveTaskOutputs(fs, src, src, tblPath,false);
+ moveTaskOutputs(fs, src, src, tblPath, true);
+ moveTaskOutputs(fs, src, src, tblPath, false);
fs.delete(src, true);
try {
updateTableSchema(client, table, jobInfo.getOutputSchema());
- LOG.info("The table {} has new partitions {}.", table.getTableName(),ptnInfos);
+ LOG.info("HAR is being used. The table {} has new partitions {}.", table.getTableName(), ptnInfos);
client.add_partitions(partitionsToAdd);
partitionsAdded = partitionsToAdd;
} catch (Exception e){
@@ -624,17 +656,16 @@ class FileOutputCommitterContainer exten
}else{
// no harProcessor, regular operation
- // No duplicate partition publish case to worry about because we'll
- // get a AlreadyExistsException here if so, and appropriately rollback
updateTableSchema(client, table, jobInfo.getOutputSchema());
- LOG.info("The table {} has new partitions {}.", table.getTableName(),ptnInfos);
- client.add_partitions(partitionsToAdd);
+ LOG.info("HAR not is not being used. The table {} has new partitions {}.", table.getTableName(), ptnInfos);
partitionsAdded = partitionsToAdd;
if (dynamicPartitioningUsed && (partitionsAdded.size()>0)){
Path src = new Path(ptnRootLocation);
- moveTaskOutputs(fs, src, src, tblPath,false);
+ moveTaskOutputs(fs, src, src, tblPath, true);
+ moveTaskOutputs(fs, src, src, tblPath, false);
fs.delete(src, true);
}
+ client.add_partitions(partitionsToAdd);
}
} catch (Exception e) {
if (partitionsAdded.size() > 0) {
@@ -680,7 +711,7 @@ class FileOutputCommitterContainer exten
client.cancelDelegationToken(tokenStrForm);
}
} catch (MetaException e) {
- LOG.warn("MetaException while cancelling delegation token.",e );
+ LOG.warn("MetaException while cancelling delegation token.", e);
} catch (TException e) {
LOG.warn("TException while cancelling delegation token.", e);
} finally {
Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/shims/HCatHadoopShims.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/shims/HCatHadoopShims.java?rev=1407712&r1=1407711&r2=1407712&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/shims/HCatHadoopShims.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/shims/HCatHadoopShims.java Sat Nov 10 01:13:19 2012
@@ -21,6 +21,8 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
@@ -88,4 +90,14 @@ public interface HCatHadoopShims {
public InetSocketAddress getResourceManagerAddress(Configuration conf);
public String getPropertyName(PropertyName name);
+
+ /**
+ * Checks if file is in HDFS filesystem.
+ *
+ * @param fs
+ * @param path
+ * @return true if the file is in HDFS, false if the file is in other file systems.
+ */
+ public boolean isFileInHDFS(FileSystem fs, Path path) throws IOException;
+
}