You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hcatalog-commits@incubator.apache.org by to...@apache.org on 2012/11/10 01:14:45 UTC

svn commit: r1407714 - in /incubator/hcatalog/branches/branch-0.4: ./ shims/src/20/java/org/apache/hcatalog/shims/ shims/src/23/java/org/apache/hcatalog/shims/ src/java/org/apache/hcatalog/mapreduce/ src/java/org/apache/hcatalog/shims/

Author: toffer
Date: Sat Nov 10 01:14:44 2012
New Revision: 1407714

URL: http://svn.apache.org/viewvc?rev=1407714&view=rev
Log:
backported from trunk: HCATALOG-538 HCatalogStorer fails for 100GB of data with dynamic partitioning, number of partition is 300 (amalakar via toffer)

Modified:
    incubator/hcatalog/branches/branch-0.4/CHANGES.txt
    incubator/hcatalog/branches/branch-0.4/shims/src/20/java/org/apache/hcatalog/shims/HCatHadoopShims20S.java
    incubator/hcatalog/branches/branch-0.4/shims/src/23/java/org/apache/hcatalog/shims/HCatHadoopShims23.java
    incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java
    incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/shims/HCatHadoopShims.java

Modified: incubator/hcatalog/branches/branch-0.4/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/CHANGES.txt?rev=1407714&r1=1407713&r2=1407714&view=diff
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/CHANGES.txt (original)
+++ incubator/hcatalog/branches/branch-0.4/CHANGES.txt Sat Nov 10 01:14:44 2012
@@ -38,6 +38,8 @@ Trunk (unreleased changes)
   HCAT-427 Document storage-based authorization (lefty via gates)
 
   IMPROVEMENTS
+  HCAT-538 HCatalogStorer fails for 100GB of data with dynamic partitioning, number of partition is 300 (amalakar via toffer)
+
   HCAT-492 Document CTAS workaround for Hive with JSON serde (lefty via khorgath)
 
   HCAT-389 hcat_ping (script to check if HCatalog server is running/reachable) (mithun via khorgath)

Modified: incubator/hcatalog/branches/branch-0.4/shims/src/20/java/org/apache/hcatalog/shims/HCatHadoopShims20S.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/shims/src/20/java/org/apache/hcatalog/shims/HCatHadoopShims20S.java?rev=1407714&r1=1407713&r2=1407714&view=diff
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/shims/src/20/java/org/apache/hcatalog/shims/HCatHadoopShims20S.java (original)
+++ incubator/hcatalog/branches/branch-0.4/shims/src/20/java/org/apache/hcatalog/shims/HCatHadoopShims20S.java Sat Nov 10 01:14:44 2012
@@ -5,6 +5,8 @@ import java.net.InetSocketAddress;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JobTracker;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobContext;
@@ -125,4 +127,10 @@ public class HCatHadoopShims20S implemen
 
         return "";
     }
+
+    @Override
+    public boolean isFileInHDFS(FileSystem fs, Path path) throws IOException {
+        // In hadoop 1.x.x the file system URI is sufficient to determine the uri of the file
+        return "hdfs".equals(fs.getUri().getScheme());
+    }
 }

Modified: incubator/hcatalog/branches/branch-0.4/shims/src/23/java/org/apache/hcatalog/shims/HCatHadoopShims23.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/shims/src/23/java/org/apache/hcatalog/shims/HCatHadoopShims23.java?rev=1407714&r1=1407713&r2=1407714&view=diff
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/shims/src/23/java/org/apache/hcatalog/shims/HCatHadoopShims23.java (original)
+++ incubator/hcatalog/branches/branch-0.4/shims/src/23/java/org/apache/hcatalog/shims/HCatHadoopShims23.java Sat Nov 10 01:14:44 2012
@@ -33,6 +33,9 @@ import org.apache.hadoop.mapreduce.TaskT
 import org.apache.hadoop.mapreduce.task.JobContextImpl;
 import org.apache.hadoop.util.Progressable;
 import org.apache.pig.ResourceSchema;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
+
 
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.net.NetUtils;
@@ -118,4 +121,11 @@ public class HCatHadoopShims23 implement
 
         return "";
     }
+
+    @Override
+    public boolean isFileInHDFS(FileSystem fs, Path path) throws IOException {
+        // In case of viewfs we need to lookup where the actual file is to know the filesystem in use.
+        // resolvePath is a sure shot way of knowing which file system the file is.
+        return "hdfs".equals(fs.resolvePath(path).toUri().getScheme());
+    }
 }

Modified: incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java?rev=1407714&r1=1407713&r2=1407714&view=diff
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java (original)
+++ incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java Sat Nov 10 01:14:44 2012
@@ -35,12 +35,9 @@ import org.apache.hadoop.hive.metastore.
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.mapred.HCatMapRedUtil;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.JobStatus.State;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
-import org.apache.hadoop.security.AccessControlException;
 import org.apache.hcatalog.common.ErrorType;
 import org.apache.hcatalog.common.HCatConstants;
 import org.apache.hcatalog.common.HCatException;
@@ -54,11 +51,7 @@ import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.BufferedWriter;
-import java.io.FileWriter;
 import java.io.IOException;
-import java.io.PrintWriter;
-import java.io.Writer;
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -176,18 +169,16 @@ class FileOutputCommitterContainer exten
             OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(jobContext);
             if (getOutputDirMarking(jobContext.getConfiguration())) {
                 Path outputPath = new Path(jobInfo.getLocation());
-                if (outputPath != null) {
-                    FileSystem fileSys = outputPath.getFileSystem(jobContext
-                            .getConfiguration());
-                    // create a file in the folder to mark it
-                    if (fileSys.exists(outputPath)) {
-                        Path filePath = new Path(outputPath,
-                                SUCCEEDED_FILE_NAME);
-                        if (!fileSys.exists(filePath)) { // may have been
-                                                         // created by
-                                                         // baseCommitter.commitJob()
-                            fileSys.create(filePath).close();
-                        }
+                FileSystem fileSys = outputPath.getFileSystem(jobContext
+                        .getConfiguration());
+                // create a file in the folder to mark it
+                if (fileSys.exists(outputPath)) {
+                    Path filePath = new Path(outputPath,
+                            SUCCEEDED_FILE_NAME);
+                    if (!fileSys.exists(filePath)) { // may have been
+                                                     // created by
+                                                     // baseCommitter.commitJob()
+                        fileSys.create(filePath).close();
                     }
                 }
             }
@@ -266,7 +257,10 @@ class FileOutputCommitterContainer exten
             partPath = constructPartialPartPath(partPath, partKey.getName().toLowerCase(), partKVs);
         }
         // Apply the group and permissions to the leaf partition and files.
-        applyGroupAndPerms(fs, partPath, perms, grpName, true);
+        // Need not bother in case of HDFS as permission is taken care of by setting UMask
+        if(!HCatHadoopShims.Instance.get().isFileInHDFS(fs, partPath)) {
+            applyGroupAndPerms(fs, partPath, perms, grpName, true);
+        }
         if (dynamicPartitioningUsed){
             String dynamicPartitionDestination = getFinalDynamicPartitionDestination(table,partKVs);
             if (harProcessor.isEnabled()){
@@ -287,22 +281,12 @@ class FileOutputCommitterContainer exten
             String group, boolean recursive)
             throws IOException {
         fs.setPermission(dir, permission);
-        try {
-            fs.setOwner(dir, null, group);
-        } catch (AccessControlException ace) {
-            LOG.warn("Error changing group of " + dir, ace);
-        }
         if (recursive) {
             for (FileStatus fileStatus : fs.listStatus(dir)) {
                 if (fileStatus.isDir()) {
-                    applyGroupAndPerms(fs, fileStatus.getPath(), permission, group, recursive);
+                    applyGroupAndPerms(fs, fileStatus.getPath(), permission, group, true);
                 } else {
                     fs.setPermission(fileStatus.getPath(), permission);
-                    try {
-                        fs.setOwner(dir, null, group);
-                    } catch (AccessControlException ace) {
-                        LOG.warn("Error changing group of " + dir, ace);
-                    }
                 }
             }
         }
@@ -366,46 +350,88 @@ class FileOutputCommitterContainer exten
      * Move all of the files from the temp directory to the final location
      * @param fs the output file system
      * @param file the file to move
-     * @param src the source directory
-     * @param dest the target directory
+     * @param srcDir the source directory
+     * @param destDir the target directory
      * @param dryRun - a flag that simply tests if this move would succeed or not based
      *                 on whether other files exist where we're trying to copy
      * @throws java.io.IOException
      */
     private void moveTaskOutputs(FileSystem fs,
                                  Path file,
-                                 Path src,
-                                 Path dest, boolean dryRun) throws IOException {
-        if (fs.isFile(file)) {
-            Path finalOutputPath = getFinalPath(file, src, dest);
+                                 Path srcDir,
+                                 Path destDir, final boolean dryRun) throws IOException {
 
+        final Path finalOutputPath = getFinalPath(file, srcDir, destDir);
+        if (fs.isFile(file)) {
             if (dryRun){
-//        LOG.info("Testing if moving ["+file+"] to ["+finalOutputPath+"] would cause a problem");
-                if (fs.exists(finalOutputPath)){
-                    throw new HCatException(ErrorType.ERROR_MOVE_FAILED, "Data already exists in " + finalOutputPath + ", duplicate publish possible.");
+                if(LOG.isDebugEnabled()) {
+                    LOG.debug("Testing if moving file: [" + file + "] to ["
+                            + finalOutputPath + "] would cause a problem");
+                }
+                if (fs.exists(finalOutputPath)) {
+                    throw new HCatException(ErrorType.ERROR_MOVE_FAILED, "Data already exists in " + finalOutputPath
+                            + ", duplicate publish not possible.");
+                }
+            } else {
+                if(LOG.isDebugEnabled()) {
+                    LOG.debug("Moving file: [ " + file + "] to [" + finalOutputPath + "]");
                 }
-            }else{
-//        LOG.info("Moving ["+file+"] to ["+finalOutputPath+"]");
                 if (!fs.rename(file, finalOutputPath)) {
                     if (!fs.delete(finalOutputPath, true)) {
                         throw new HCatException(ErrorType.ERROR_MOVE_FAILED, "Failed to delete existing path " + finalOutputPath);
                     }
                     if (!fs.rename(file, finalOutputPath)) {
-                        throw new HCatException(ErrorType.ERROR_MOVE_FAILED, "Failed to move output to " + dest);
+                        throw new HCatException(ErrorType.ERROR_MOVE_FAILED, "Failed to move output to " + finalOutputPath);
                     }
                 }
             }
         } else if(fs.getFileStatus(file).isDir()) {
-            FileStatus[] paths = fs.listStatus(file);
-            Path finalOutputPath = getFinalPath(file, src, dest);
-            if (!dryRun){
-                fs.mkdirs(finalOutputPath);
-            }
-            if (paths != null) {
-                for (FileStatus path : paths) {
-                    moveTaskOutputs(fs, path.getPath(), src, dest,dryRun);
+            FileStatus[] children = fs.listStatus(file);
+            if (children != null && children.length > 0) {
+                FileStatus firstChild = children[0];
+                if(firstChild.isDir()) {
+                    // If the first child is directory, then rest would be directory too according to HCatalog dir structure
+                    // recurse in that case
+                    for (FileStatus child : children) {
+                        moveTaskOutputs(fs, child.getPath(), srcDir, destDir, dryRun);
+                    }
+                } else {
+
+
+                    if (!dryRun) {
+                        if (dynamicPartitioningUsed) {
+                            // Optimization: if the first child is file, we have reached the leaf directory, move the parent directory itself
+                            // instead of moving each file under the directory. See HCATALOG-538
+
+                            final Path parentDir = finalOutputPath.getParent();
+                            // Create the directory
+                            fs.mkdirs(parentDir);
+                            if (LOG.isDebugEnabled()) {
+                                LOG.debug("Moving directory: " + file + " to " + parentDir);
+                            }
+                            if (!fs.rename(file, parentDir)) {
+                                final String msg = "Failed to move file: " + file + " to " + parentDir;
+                                LOG.error(msg);
+                                throw new HCatException(ErrorType.ERROR_MOVE_FAILED, msg);
+                            }
+                        } else {
+                            // In case of no partition we have to move each file
+                            for (FileStatus child : children) {
+                                moveTaskOutputs(fs, child.getPath(), srcDir, destDir, dryRun);
+                            }
+                        }
+                    } else {
+                        if(fs.exists(finalOutputPath)) {
+                            throw new HCatException(ErrorType.ERROR_MOVE_FAILED, "Data already exists in " + finalOutputPath
+                                    + ", duplicate publish not possible.");
+                        }
+                    }
                 }
             }
+        } else {
+            // Should never happen
+            final String msg = "Unknown file type being asked to be moved, erroring out";
+            throw new HCatException(ErrorType.ERROR_MOVE_FAILED, msg);
         }
     }
 
@@ -566,12 +592,12 @@ class FileOutputCommitterContainer exten
                 Path src = new Path(ptnRootLocation);
                 // check here for each dir we're copying out, to see if it
                 // already exists, error out if so
-                moveTaskOutputs(fs, src, src, tblPath,true);
-                moveTaskOutputs(fs, src, src, tblPath,false);
+                moveTaskOutputs(fs, src, src, tblPath, true);
+                moveTaskOutputs(fs, src, src, tblPath, false);
                 fs.delete(src, true);
                 try {
                     updateTableSchema(client, table, jobInfo.getOutputSchema());
-                    LOG.info("The table {} has new partitions {}.", table.getTableName(),ptnInfos);
+                    LOG.info("HAR is being used. The table {} has new partitions {}.", table.getTableName(), ptnInfos);
                     client.add_partitions(partitionsToAdd);
                     partitionsAdded = partitionsToAdd;
                 } catch (Exception e){
@@ -587,17 +613,16 @@ class FileOutputCommitterContainer exten
 
             }else{
                 // no harProcessor, regular operation
-                // No duplicate partition publish case to worry about because we'll
-                // get a AlreadyExistsException here if so, and appropriately rollback
                 updateTableSchema(client, table, jobInfo.getOutputSchema());
-                LOG.info("The table {} has new partitions {}.", table.getTableName(),ptnInfos);
-                client.add_partitions(partitionsToAdd);
+                LOG.info("HAR not is not being used. The table {} has new partitions {}.", table.getTableName(), ptnInfos);
                 partitionsAdded = partitionsToAdd;
                 if (dynamicPartitioningUsed && (partitionsAdded.size()>0)){
                     Path src = new Path(ptnRootLocation);
-                    moveTaskOutputs(fs, src, src, tblPath,false);
+                    moveTaskOutputs(fs, src, src, tblPath, true);
+                    moveTaskOutputs(fs, src, src, tblPath, false);
                     fs.delete(src, true);
                 }
+                client.add_partitions(partitionsToAdd);
             }
         } catch (Exception e) {
           if( partitionsAdded.size() > 0 ) {
@@ -653,8 +678,7 @@ class FileOutputCommitterContainer exten
             Path src;
             OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(context);
             if (dynamicPartitioningUsed) {
-                src = new Path(getPartitionRootLocation(jobInfo.getLocation()
-                        .toString(), jobInfo.getTableInfo().getTable()
+                src = new Path(getPartitionRootLocation(jobInfo.getLocation(), jobInfo.getTableInfo().getTable()
                         .getPartitionKeysSize()));
             } else {
                 src = new Path(jobInfo.getLocation());
@@ -686,7 +710,7 @@ class FileOutputCommitterContainer exten
                 client.cancelDelegationToken(tokenStrForm);
             }
         } catch (MetaException e) {
-            LOG.warn("MetaException while cancelling delegation token.",e );
+            LOG.warn("MetaException while cancelling delegation token.", e);
         } catch (TException e) {
             LOG.warn("TException while cancelling delegation token.", e);
         } finally {

Modified: incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/shims/HCatHadoopShims.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/shims/HCatHadoopShims.java?rev=1407714&r1=1407713&r2=1407714&view=diff
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/shims/HCatHadoopShims.java (original)
+++ incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/shims/HCatHadoopShims.java Sat Nov 10 01:14:44 2012
@@ -21,13 +21,14 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.TaskID;
 import org.apache.hadoop.util.Progressable;
@@ -89,4 +90,12 @@ public interface HCatHadoopShims {
   public InetSocketAddress getResourceManagerAddress(Configuration conf);
 
   public String getPropertyName(PropertyName name);
+
+  /** Checks if file is in HDFS filesystem.
+   *
+   * @param fs
+   * @param path
+   * @return true if the file is in HDFS, false if the file is in other file systems.
+  */
+  public boolean isFileInHDFS(FileSystem fs, Path path) throws IOException;
 }