You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hcatalog-commits@incubator.apache.org by to...@apache.org on 2012/11/10 01:13:20 UTC

svn commit: r1407712 - in /incubator/hcatalog/trunk: ./ shims/src/20/java/org/apache/hcatalog/shims/ shims/src/23/java/org/apache/hcatalog/shims/ src/java/org/apache/hcatalog/mapreduce/ src/java/org/apache/hcatalog/shims/

Author: toffer
Date: Sat Nov 10 01:13:19 2012
New Revision: 1407712

URL: http://svn.apache.org/viewvc?rev=1407712&view=rev
Log:
HCATALOG-538 HCatalogStorer fails for 100GB of data with dynamic partitioning, number of partition is 300 (amalakar via toffer)

Modified:
    incubator/hcatalog/trunk/CHANGES.txt
    incubator/hcatalog/trunk/shims/src/20/java/org/apache/hcatalog/shims/HCatHadoopShims20S.java
    incubator/hcatalog/trunk/shims/src/23/java/org/apache/hcatalog/shims/HCatHadoopShims23.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/shims/HCatHadoopShims.java

Modified: incubator/hcatalog/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/CHANGES.txt?rev=1407712&r1=1407711&r2=1407712&view=diff
==============================================================================
--- incubator/hcatalog/trunk/CHANGES.txt (original)
+++ incubator/hcatalog/trunk/CHANGES.txt Sat Nov 10 01:13:19 2012
@@ -44,6 +44,8 @@ Trunk (unreleased changes)
   HCAT-427 Document storage-based authorization (lefty via gates)
 
   IMPROVEMENTS
+  HCAT-538 HCatalogStorer fails for 100GB of data with dynamic partitioning, number of partition is 300 (amalakar via toffer)
+
   HCAT-532 HiveClientCache shutdown hook should log at debug level (traviscrawford)
 
   HCAT-528 ivysettings.xml does not let you override .m2/repository or ivy.cache.dir setting (raja@cmbasics.com via traviscrawford)

Modified: incubator/hcatalog/trunk/shims/src/20/java/org/apache/hcatalog/shims/HCatHadoopShims20S.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/shims/src/20/java/org/apache/hcatalog/shims/HCatHadoopShims20S.java?rev=1407712&r1=1407711&r2=1407712&view=diff
==============================================================================
--- incubator/hcatalog/trunk/shims/src/20/java/org/apache/hcatalog/shims/HCatHadoopShims20S.java (original)
+++ incubator/hcatalog/trunk/shims/src/20/java/org/apache/hcatalog/shims/HCatHadoopShims20S.java Sat Nov 10 01:13:19 2012
@@ -23,6 +23,8 @@ import java.net.InetSocketAddress;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JobTracker;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobContext;
@@ -140,4 +142,10 @@ public class HCatHadoopShims20S implemen
 
         return "";
     }
+
+    @Override
+    public boolean isFileInHDFS(FileSystem fs, Path path) throws IOException {
+        // In hadoop 1.x.x the file system URI is sufficient to determine the uri of the file
+        return "hdfs".equals(fs.getUri().getScheme());
+    }
 }

Modified: incubator/hcatalog/trunk/shims/src/23/java/org/apache/hcatalog/shims/HCatHadoopShims23.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/shims/src/23/java/org/apache/hcatalog/shims/HCatHadoopShims23.java?rev=1407712&r1=1407711&r2=1407712&view=diff
==============================================================================
--- incubator/hcatalog/trunk/shims/src/23/java/org/apache/hcatalog/shims/HCatHadoopShims23.java (original)
+++ incubator/hcatalog/trunk/shims/src/23/java/org/apache/hcatalog/shims/HCatHadoopShims23.java Sat Nov 10 01:13:19 2012
@@ -32,6 +32,9 @@ import org.apache.hadoop.mapreduce.TaskI
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.task.JobContextImpl;
 import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
+
 
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.net.NetUtils;
@@ -116,4 +119,11 @@ public class HCatHadoopShims23 implement
 
         return "";
     }
+
+    @Override
+    public boolean isFileInHDFS(FileSystem fs, Path path) throws IOException {
+        // In case of viewfs we need to lookup where the actual file is to know the filesystem in use.
+        // resolvePath is a sure shot way of knowing which file system the file is.
+        return "hdfs".equals(fs.resolvePath(path).toUri().getScheme());
+    }
 }

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java?rev=1407712&r1=1407711&r2=1407712&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java Sat Nov 10 01:13:19 2012
@@ -47,7 +47,6 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.JobStatus.State;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.security.AccessControlException;
 import org.apache.hcatalog.common.ErrorType;
 import org.apache.hcatalog.common.HCatConstants;
 import org.apache.hcatalog.common.HCatException;
@@ -161,8 +160,7 @@ class FileOutputCommitterContainer exten
             Path src;
             OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(jobContext);
             if (dynamicPartitioningUsed) {
-                src = new Path(getPartitionRootLocation(jobInfo.getLocation()
-                        .toString(), jobInfo.getTableInfo().getTable()
+                src = new Path(getPartitionRootLocation(jobInfo.getLocation(), jobInfo.getTableInfo().getTable()
                         .getPartitionKeysSize()));
             } else {
                 src = new Path(jobInfo.getLocation());
@@ -205,18 +203,16 @@ class FileOutputCommitterContainer exten
             OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(jobContext);
             if (getOutputDirMarking(jobContext.getConfiguration())) {
                 Path outputPath = new Path(jobInfo.getLocation());
-                if (outputPath != null) {
-                    FileSystem fileSys = outputPath.getFileSystem(jobContext
-                            .getConfiguration());
-                    // create a file in the folder to mark it
-                    if (fileSys.exists(outputPath)) {
-                        Path filePath = new Path(outputPath,
-                                SUCCEEDED_FILE_NAME);
-                        if (!fileSys.exists(filePath)) { // may have been
-                                                         // created by
-                                                         // baseCommitter.commitJob()
-                            fileSys.create(filePath).close();
-                        }
+                FileSystem fileSys = outputPath.getFileSystem(jobContext
+                        .getConfiguration());
+                // create a file in the folder to mark it
+                if (fileSys.exists(outputPath)) {
+                    Path filePath = new Path(outputPath,
+                            SUCCEEDED_FILE_NAME);
+                    if (!fileSys.exists(filePath)) { // may have been
+                                                     // created by
+                                                     // baseCommitter.commitJob()
+                        fileSys.create(filePath).close();
                     }
                 }
             }
@@ -303,7 +299,10 @@ class FileOutputCommitterContainer exten
         }
 
         // Apply the group and permissions to the leaf partition and files.
-        applyGroupAndPerms(fs, partPath, perms, grpName, true);
+        // Need not bother in case of HDFS as permission is taken care of by setting UMask
+        if (!HCatHadoopShims.Instance.get().isFileInHDFS(fs, partPath)) {
+            applyGroupAndPerms(fs, partPath, perms, grpName, true);
+        }
 
         // Set the location in the StorageDescriptor
         if (dynamicPartitioningUsed) {
@@ -325,22 +324,12 @@ class FileOutputCommitterContainer exten
                                     String group, boolean recursive)
         throws IOException {
         fs.setPermission(dir, permission);
-        try {
-            fs.setOwner(dir, null, group);
-        } catch (AccessControlException ace) {
-            LOG.warn("Error changing group of " + dir, ace);
-        }
         if (recursive) {
             for (FileStatus fileStatus : fs.listStatus(dir)) {
                 if (fileStatus.isDir()) {
-                    applyGroupAndPerms(fs, fileStatus.getPath(), permission, group, recursive);
+                    applyGroupAndPerms(fs, fileStatus.getPath(), permission, group, true);
                 } else {
                     fs.setPermission(fileStatus.getPath(), permission);
-                    try {
-                        fs.setOwner(dir, null, group);
-                    } catch (AccessControlException ace) {
-                        LOG.warn("Error changing group of " + dir, ace);
-                    }
                 }
             }
         }
@@ -387,6 +376,7 @@ class FileOutputCommitterContainer exten
     private void updateTableSchema(HiveMetaStoreClient client, Table table,
                                    HCatSchema partitionSchema) throws IOException, InvalidOperationException, MetaException, TException {
 
+
         List<FieldSchema> newColumns = HCatUtil.validatePartitionSchema(table, partitionSchema);
 
         if (newColumns.size() != 0) {
@@ -403,46 +393,88 @@ class FileOutputCommitterContainer exten
      * Move all of the files from the temp directory to the final location
      * @param fs the output file system
      * @param file the file to move
-     * @param src the source directory
-     * @param dest the target directory
+     * @param srcDir the source directory
+     * @param destDir the target directory
      * @param dryRun - a flag that simply tests if this move would succeed or not based
      *                 on whether other files exist where we're trying to copy
      * @throws java.io.IOException
      */
     private void moveTaskOutputs(FileSystem fs,
                                  Path file,
-                                 Path src,
-                                 Path dest, boolean dryRun) throws IOException {
-        if (fs.isFile(file)) {
-            Path finalOutputPath = getFinalPath(file, src, dest);
+                                 Path srcDir,
+                                 Path destDir, final boolean dryRun) throws IOException {
 
-            if (dryRun) {
-//        LOG.info("Testing if moving ["+file+"] to ["+finalOutputPath+"] would cause a problem");
+        final Path finalOutputPath = getFinalPath(file, srcDir, destDir);
+        if (fs.isFile(file)) {
+            if (dryRun){
+                if(LOG.isDebugEnabled()) {
+                    LOG.debug("Testing if moving file: [" + file + "] to ["
+                            + finalOutputPath + "] would cause a problem");
+                }
                 if (fs.exists(finalOutputPath)) {
-                    throw new HCatException(ErrorType.ERROR_MOVE_FAILED, "Data already exists in " + finalOutputPath + ", duplicate publish possible.");
+                    throw new HCatException(ErrorType.ERROR_MOVE_FAILED, "Data already exists in " + finalOutputPath
+                            + ", duplicate publish not possible.");
                 }
             } else {
-//        LOG.info("Moving ["+file+"] to ["+finalOutputPath+"]");
+                if(LOG.isDebugEnabled()) {
+                    LOG.debug("Moving file: [ " + file + "] to [" + finalOutputPath + "]");
+                }
                 if (!fs.rename(file, finalOutputPath)) {
                     if (!fs.delete(finalOutputPath, true)) {
                         throw new HCatException(ErrorType.ERROR_MOVE_FAILED, "Failed to delete existing path " + finalOutputPath);
                     }
                     if (!fs.rename(file, finalOutputPath)) {
-                        throw new HCatException(ErrorType.ERROR_MOVE_FAILED, "Failed to move output to " + dest);
+                        throw new HCatException(ErrorType.ERROR_MOVE_FAILED, "Failed to move output to " + finalOutputPath);
                     }
                 }
             }
-        } else if (fs.getFileStatus(file).isDir()) {
-            FileStatus[] paths = fs.listStatus(file);
-            Path finalOutputPath = getFinalPath(file, src, dest);
-            if (!dryRun) {
-                fs.mkdirs(finalOutputPath);
-            }
-            if (paths != null) {
-                for (FileStatus path : paths) {
-                    moveTaskOutputs(fs, path.getPath(), src, dest, dryRun);
+        } else if(fs.getFileStatus(file).isDir()) {
+            FileStatus[] children = fs.listStatus(file);
+            if (children != null && children.length > 0) {
+                FileStatus firstChild = children[0];
+                if(firstChild.isDir()) {
+                    // If the first child is directory, then rest would be directory too according to HCatalog dir structure
+                    // recurse in that case
+                    for (FileStatus child : children) {
+                        moveTaskOutputs(fs, child.getPath(), srcDir, destDir, dryRun);
+                    }
+                } else {
+
+
+                    if (!dryRun) {
+                        if (dynamicPartitioningUsed) {
+                            // Optimization: if the first child is file, we have reached the leaf directory, move the parent directory itself
+                            // instead of moving each file under the directory. See HCATALOG-538
+
+                            final Path parentDir = finalOutputPath.getParent();
+                            // Create the directory
+                            fs.mkdirs(parentDir);
+                            if (LOG.isDebugEnabled()) {
+                                LOG.debug("Moving directory: " + file + " to " + parentDir);
+                            }
+                            if (!fs.rename(file, parentDir)) {
+                                final String msg = "Failed to move file: " + file + " to " + parentDir;
+                                LOG.error(msg);
+                                throw new HCatException(ErrorType.ERROR_MOVE_FAILED, msg);
+                            }
+                        } else {
+                            // In case of no partition we have to move each file
+                            for (FileStatus child : children) {
+                                moveTaskOutputs(fs, child.getPath(), srcDir, destDir, dryRun);
+                            }
+                        }
+                    } else {
+                        if(fs.exists(finalOutputPath)) {
+                            throw new HCatException(ErrorType.ERROR_MOVE_FAILED, "Data already exists in " + finalOutputPath
+                                    + ", duplicate publish not possible.");
+                        }
+                    }
                 }
             }
+        } else {
+            // Should never happen
+            final String msg = "Unknown file type being asked to be moved, erroring out";
+            throw new HCatException(ErrorType.ERROR_MOVE_FAILED, msg);
         }
     }
 
@@ -603,12 +635,12 @@ class FileOutputCommitterContainer exten
                 Path src = new Path(ptnRootLocation);
                 // check here for each dir we're copying out, to see if it
                 // already exists, error out if so
-                moveTaskOutputs(fs, src, src, tblPath,true);
-                moveTaskOutputs(fs, src, src, tblPath,false);
+                moveTaskOutputs(fs, src, src, tblPath, true);
+                moveTaskOutputs(fs, src, src, tblPath, false);
                 fs.delete(src, true);
                 try {
                     updateTableSchema(client, table, jobInfo.getOutputSchema());
-                    LOG.info("The table {} has new partitions {}.", table.getTableName(),ptnInfos);
+                    LOG.info("HAR is being used. The table {} has new partitions {}.", table.getTableName(), ptnInfos);
                     client.add_partitions(partitionsToAdd);
                     partitionsAdded = partitionsToAdd;
                 } catch (Exception e){
@@ -624,17 +656,16 @@ class FileOutputCommitterContainer exten
 
             }else{
                 // no harProcessor, regular operation
-                // No duplicate partition publish case to worry about because we'll
-                // get a AlreadyExistsException here if so, and appropriately rollback
                 updateTableSchema(client, table, jobInfo.getOutputSchema());
-                LOG.info("The table {} has new partitions {}.", table.getTableName(),ptnInfos);
-                client.add_partitions(partitionsToAdd);
+                LOG.info("HAR not is not being used. The table {} has new partitions {}.", table.getTableName(), ptnInfos);
                 partitionsAdded = partitionsToAdd;
                 if (dynamicPartitioningUsed && (partitionsAdded.size()>0)){
                     Path src = new Path(ptnRootLocation);
-                    moveTaskOutputs(fs, src, src, tblPath,false);
+                    moveTaskOutputs(fs, src, src, tblPath, true);
+                    moveTaskOutputs(fs, src, src, tblPath, false);
                     fs.delete(src, true);
                 }
+                client.add_partitions(partitionsToAdd);
             }
         } catch (Exception e) {
             if (partitionsAdded.size() > 0) {
@@ -680,7 +711,7 @@ class FileOutputCommitterContainer exten
                 client.cancelDelegationToken(tokenStrForm);
             }
         } catch (MetaException e) {
-            LOG.warn("MetaException while cancelling delegation token.",e );
+            LOG.warn("MetaException while cancelling delegation token.", e);
         } catch (TException e) {
             LOG.warn("TException while cancelling delegation token.", e);
         } finally {

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/shims/HCatHadoopShims.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/shims/HCatHadoopShims.java?rev=1407712&r1=1407711&r2=1407712&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/shims/HCatHadoopShims.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/shims/HCatHadoopShims.java Sat Nov 10 01:13:19 2012
@@ -21,6 +21,8 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.Job;
@@ -88,4 +90,14 @@ public interface HCatHadoopShims {
     public InetSocketAddress getResourceManagerAddress(Configuration conf);
 
     public String getPropertyName(PropertyName name);
+
+    /**
+     * Checks if file is in HDFS filesystem.
+     *
+     * @param fs
+     * @param path
+     * @return true if the file is in HDFS, false if the file is in other file systems.
+     */
+    public boolean isFileInHDFS(FileSystem fs, Path path) throws IOException;
+
 }