You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hcatalog-commits@incubator.apache.org by to...@apache.org on 2012/11/10 01:14:45 UTC
svn commit: r1407714 - in /incubator/hcatalog/branches/branch-0.4: ./
shims/src/20/java/org/apache/hcatalog/shims/
shims/src/23/java/org/apache/hcatalog/shims/
src/java/org/apache/hcatalog/mapreduce/ src/java/org/apache/hcatalog/shims/
Author: toffer
Date: Sat Nov 10 01:14:44 2012
New Revision: 1407714
URL: http://svn.apache.org/viewvc?rev=1407714&view=rev
Log:
backported from trunk: HCATALOG-538 HCatalogStorer fails for 100GB of data with dynamic partitioning, number of partition is 300 (amalakar via toffer)
Modified:
incubator/hcatalog/branches/branch-0.4/CHANGES.txt
incubator/hcatalog/branches/branch-0.4/shims/src/20/java/org/apache/hcatalog/shims/HCatHadoopShims20S.java
incubator/hcatalog/branches/branch-0.4/shims/src/23/java/org/apache/hcatalog/shims/HCatHadoopShims23.java
incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java
incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/shims/HCatHadoopShims.java
Modified: incubator/hcatalog/branches/branch-0.4/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/CHANGES.txt?rev=1407714&r1=1407713&r2=1407714&view=diff
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/CHANGES.txt (original)
+++ incubator/hcatalog/branches/branch-0.4/CHANGES.txt Sat Nov 10 01:14:44 2012
@@ -38,6 +38,8 @@ Trunk (unreleased changes)
HCAT-427 Document storage-based authorization (lefty via gates)
IMPROVEMENTS
+ HCAT-538 HCatalogStorer fails for 100GB of data with dynamic partitioning, number of partition is 300 (amalakar via toffer)
+
HCAT-492 Document CTAS workaround for Hive with JSON serde (lefty via khorgath)
HCAT-389 hcat_ping (script to check if HCatalog server is running/reachable) (mithun via khorgath)
Modified: incubator/hcatalog/branches/branch-0.4/shims/src/20/java/org/apache/hcatalog/shims/HCatHadoopShims20S.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/shims/src/20/java/org/apache/hcatalog/shims/HCatHadoopShims20S.java?rev=1407714&r1=1407713&r2=1407714&view=diff
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/shims/src/20/java/org/apache/hcatalog/shims/HCatHadoopShims20S.java (original)
+++ incubator/hcatalog/branches/branch-0.4/shims/src/20/java/org/apache/hcatalog/shims/HCatHadoopShims20S.java Sat Nov 10 01:14:44 2012
@@ -5,6 +5,8 @@ import java.net.InetSocketAddress;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobTracker;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
@@ -125,4 +127,10 @@ public class HCatHadoopShims20S implemen
return "";
}
+
+ @Override
+ public boolean isFileInHDFS(FileSystem fs, Path path) throws IOException {
+ // In hadoop 1.x.x the file system URI is sufficient to determine the uri of the file
+ return "hdfs".equals(fs.getUri().getScheme());
+ }
}
Modified: incubator/hcatalog/branches/branch-0.4/shims/src/23/java/org/apache/hcatalog/shims/HCatHadoopShims23.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/shims/src/23/java/org/apache/hcatalog/shims/HCatHadoopShims23.java?rev=1407714&r1=1407713&r2=1407714&view=diff
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/shims/src/23/java/org/apache/hcatalog/shims/HCatHadoopShims23.java (original)
+++ incubator/hcatalog/branches/branch-0.4/shims/src/23/java/org/apache/hcatalog/shims/HCatHadoopShims23.java Sat Nov 10 01:14:44 2012
@@ -33,6 +33,9 @@ import org.apache.hadoop.mapreduce.TaskT
import org.apache.hadoop.mapreduce.task.JobContextImpl;
import org.apache.hadoop.util.Progressable;
import org.apache.pig.ResourceSchema;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
+
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.net.NetUtils;
@@ -118,4 +121,11 @@ public class HCatHadoopShims23 implement
return "";
}
+
+ @Override
+ public boolean isFileInHDFS(FileSystem fs, Path path) throws IOException {
+ // In case of viewfs we need to lookup where the actual file is to know the filesystem in use.
+ // resolvePath is a sure shot way of knowing which file system the file is.
+ return "hdfs".equals(fs.resolvePath(path).toUri().getScheme());
+ }
}
Modified: incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java?rev=1407714&r1=1407713&r2=1407714&view=diff
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java (original)
+++ incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java Sat Nov 10 01:14:44 2012
@@ -35,12 +35,9 @@ import org.apache.hadoop.hive.metastore.
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.mapred.HCatMapRedUtil;
import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobStatus.State;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
-import org.apache.hadoop.security.AccessControlException;
import org.apache.hcatalog.common.ErrorType;
import org.apache.hcatalog.common.HCatConstants;
import org.apache.hcatalog.common.HCatException;
@@ -54,11 +51,7 @@ import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.BufferedWriter;
-import java.io.FileWriter;
import java.io.IOException;
-import java.io.PrintWriter;
-import java.io.Writer;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
@@ -176,18 +169,16 @@ class FileOutputCommitterContainer exten
OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(jobContext);
if (getOutputDirMarking(jobContext.getConfiguration())) {
Path outputPath = new Path(jobInfo.getLocation());
- if (outputPath != null) {
- FileSystem fileSys = outputPath.getFileSystem(jobContext
- .getConfiguration());
- // create a file in the folder to mark it
- if (fileSys.exists(outputPath)) {
- Path filePath = new Path(outputPath,
- SUCCEEDED_FILE_NAME);
- if (!fileSys.exists(filePath)) { // may have been
- // created by
- // baseCommitter.commitJob()
- fileSys.create(filePath).close();
- }
+ FileSystem fileSys = outputPath.getFileSystem(jobContext
+ .getConfiguration());
+ // create a file in the folder to mark it
+ if (fileSys.exists(outputPath)) {
+ Path filePath = new Path(outputPath,
+ SUCCEEDED_FILE_NAME);
+ if (!fileSys.exists(filePath)) { // may have been
+ // created by
+ // baseCommitter.commitJob()
+ fileSys.create(filePath).close();
}
}
}
@@ -266,7 +257,10 @@ class FileOutputCommitterContainer exten
partPath = constructPartialPartPath(partPath, partKey.getName().toLowerCase(), partKVs);
}
// Apply the group and permissions to the leaf partition and files.
- applyGroupAndPerms(fs, partPath, perms, grpName, true);
+ // Need not bother in case of HDFS as permission is taken care of by setting UMask
+ if(!HCatHadoopShims.Instance.get().isFileInHDFS(fs, partPath)) {
+ applyGroupAndPerms(fs, partPath, perms, grpName, true);
+ }
if (dynamicPartitioningUsed){
String dynamicPartitionDestination = getFinalDynamicPartitionDestination(table,partKVs);
if (harProcessor.isEnabled()){
@@ -287,22 +281,12 @@ class FileOutputCommitterContainer exten
String group, boolean recursive)
throws IOException {
fs.setPermission(dir, permission);
- try {
- fs.setOwner(dir, null, group);
- } catch (AccessControlException ace) {
- LOG.warn("Error changing group of " + dir, ace);
- }
if (recursive) {
for (FileStatus fileStatus : fs.listStatus(dir)) {
if (fileStatus.isDir()) {
- applyGroupAndPerms(fs, fileStatus.getPath(), permission, group, recursive);
+ applyGroupAndPerms(fs, fileStatus.getPath(), permission, group, true);
} else {
fs.setPermission(fileStatus.getPath(), permission);
- try {
- fs.setOwner(dir, null, group);
- } catch (AccessControlException ace) {
- LOG.warn("Error changing group of " + dir, ace);
- }
}
}
}
@@ -366,46 +350,88 @@ class FileOutputCommitterContainer exten
* Move all of the files from the temp directory to the final location
* @param fs the output file system
* @param file the file to move
- * @param src the source directory
- * @param dest the target directory
+ * @param srcDir the source directory
+ * @param destDir the target directory
* @param dryRun - a flag that simply tests if this move would succeed or not based
* on whether other files exist where we're trying to copy
* @throws java.io.IOException
*/
private void moveTaskOutputs(FileSystem fs,
Path file,
- Path src,
- Path dest, boolean dryRun) throws IOException {
- if (fs.isFile(file)) {
- Path finalOutputPath = getFinalPath(file, src, dest);
+ Path srcDir,
+ Path destDir, final boolean dryRun) throws IOException {
+ final Path finalOutputPath = getFinalPath(file, srcDir, destDir);
+ if (fs.isFile(file)) {
if (dryRun){
-// LOG.info("Testing if moving ["+file+"] to ["+finalOutputPath+"] would cause a problem");
- if (fs.exists(finalOutputPath)){
- throw new HCatException(ErrorType.ERROR_MOVE_FAILED, "Data already exists in " + finalOutputPath + ", duplicate publish possible.");
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Testing if moving file: [" + file + "] to ["
+ + finalOutputPath + "] would cause a problem");
+ }
+ if (fs.exists(finalOutputPath)) {
+ throw new HCatException(ErrorType.ERROR_MOVE_FAILED, "Data already exists in " + finalOutputPath
+ + ", duplicate publish not possible.");
+ }
+ } else {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Moving file: [ " + file + "] to [" + finalOutputPath + "]");
}
- }else{
-// LOG.info("Moving ["+file+"] to ["+finalOutputPath+"]");
if (!fs.rename(file, finalOutputPath)) {
if (!fs.delete(finalOutputPath, true)) {
throw new HCatException(ErrorType.ERROR_MOVE_FAILED, "Failed to delete existing path " + finalOutputPath);
}
if (!fs.rename(file, finalOutputPath)) {
- throw new HCatException(ErrorType.ERROR_MOVE_FAILED, "Failed to move output to " + dest);
+ throw new HCatException(ErrorType.ERROR_MOVE_FAILED, "Failed to move output to " + finalOutputPath);
}
}
}
} else if(fs.getFileStatus(file).isDir()) {
- FileStatus[] paths = fs.listStatus(file);
- Path finalOutputPath = getFinalPath(file, src, dest);
- if (!dryRun){
- fs.mkdirs(finalOutputPath);
- }
- if (paths != null) {
- for (FileStatus path : paths) {
- moveTaskOutputs(fs, path.getPath(), src, dest,dryRun);
+ FileStatus[] children = fs.listStatus(file);
+ if (children != null && children.length > 0) {
+ FileStatus firstChild = children[0];
+ if(firstChild.isDir()) {
+ // If the first child is directory, then rest would be directory too according to HCatalog dir structure
+ // recurse in that case
+ for (FileStatus child : children) {
+ moveTaskOutputs(fs, child.getPath(), srcDir, destDir, dryRun);
+ }
+ } else {
+
+
+ if (!dryRun) {
+ if (dynamicPartitioningUsed) {
+ // Optimization: if the first child is file, we have reached the leaf directory, move the parent directory itself
+ // instead of moving each file under the directory. See HCATALOG-538
+
+ final Path parentDir = finalOutputPath.getParent();
+ // Create the directory
+ fs.mkdirs(parentDir);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Moving directory: " + file + " to " + parentDir);
+ }
+ if (!fs.rename(file, parentDir)) {
+ final String msg = "Failed to move file: " + file + " to " + parentDir;
+ LOG.error(msg);
+ throw new HCatException(ErrorType.ERROR_MOVE_FAILED, msg);
+ }
+ } else {
+ // In case of no partition we have to move each file
+ for (FileStatus child : children) {
+ moveTaskOutputs(fs, child.getPath(), srcDir, destDir, dryRun);
+ }
+ }
+ } else {
+ if(fs.exists(finalOutputPath)) {
+ throw new HCatException(ErrorType.ERROR_MOVE_FAILED, "Data already exists in " + finalOutputPath
+ + ", duplicate publish not possible.");
+ }
+ }
}
}
+ } else {
+ // Should never happen
+ final String msg = "Unknown file type being asked to be moved, erroring out";
+ throw new HCatException(ErrorType.ERROR_MOVE_FAILED, msg);
}
}
@@ -566,12 +592,12 @@ class FileOutputCommitterContainer exten
Path src = new Path(ptnRootLocation);
// check here for each dir we're copying out, to see if it
// already exists, error out if so
- moveTaskOutputs(fs, src, src, tblPath,true);
- moveTaskOutputs(fs, src, src, tblPath,false);
+ moveTaskOutputs(fs, src, src, tblPath, true);
+ moveTaskOutputs(fs, src, src, tblPath, false);
fs.delete(src, true);
try {
updateTableSchema(client, table, jobInfo.getOutputSchema());
- LOG.info("The table {} has new partitions {}.", table.getTableName(),ptnInfos);
+ LOG.info("HAR is being used. The table {} has new partitions {}.", table.getTableName(), ptnInfos);
client.add_partitions(partitionsToAdd);
partitionsAdded = partitionsToAdd;
} catch (Exception e){
@@ -587,17 +613,16 @@ class FileOutputCommitterContainer exten
}else{
// no harProcessor, regular operation
- // No duplicate partition publish case to worry about because we'll
- // get a AlreadyExistsException here if so, and appropriately rollback
updateTableSchema(client, table, jobInfo.getOutputSchema());
- LOG.info("The table {} has new partitions {}.", table.getTableName(),ptnInfos);
- client.add_partitions(partitionsToAdd);
+ LOG.info("HAR not is not being used. The table {} has new partitions {}.", table.getTableName(), ptnInfos);
partitionsAdded = partitionsToAdd;
if (dynamicPartitioningUsed && (partitionsAdded.size()>0)){
Path src = new Path(ptnRootLocation);
- moveTaskOutputs(fs, src, src, tblPath,false);
+ moveTaskOutputs(fs, src, src, tblPath, true);
+ moveTaskOutputs(fs, src, src, tblPath, false);
fs.delete(src, true);
}
+ client.add_partitions(partitionsToAdd);
}
} catch (Exception e) {
if( partitionsAdded.size() > 0 ) {
@@ -653,8 +678,7 @@ class FileOutputCommitterContainer exten
Path src;
OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(context);
if (dynamicPartitioningUsed) {
- src = new Path(getPartitionRootLocation(jobInfo.getLocation()
- .toString(), jobInfo.getTableInfo().getTable()
+ src = new Path(getPartitionRootLocation(jobInfo.getLocation(), jobInfo.getTableInfo().getTable()
.getPartitionKeysSize()));
} else {
src = new Path(jobInfo.getLocation());
@@ -686,7 +710,7 @@ class FileOutputCommitterContainer exten
client.cancelDelegationToken(tokenStrForm);
}
} catch (MetaException e) {
- LOG.warn("MetaException while cancelling delegation token.",e );
+ LOG.warn("MetaException while cancelling delegation token.", e);
} catch (TException e) {
LOG.warn("TException while cancelling delegation token.", e);
} finally {
Modified: incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/shims/HCatHadoopShims.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/shims/HCatHadoopShims.java?rev=1407714&r1=1407713&r2=1407714&view=diff
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/shims/HCatHadoopShims.java (original)
+++ incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/shims/HCatHadoopShims.java Sat Nov 10 01:14:44 2012
@@ -21,13 +21,14 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.util.Progressable;
@@ -89,4 +90,12 @@ public interface HCatHadoopShims {
public InetSocketAddress getResourceManagerAddress(Configuration conf);
public String getPropertyName(PropertyName name);
+
+ /** Checks if file is in HDFS filesystem.
+ *
+ * @param fs
+ * @param path
+ * @return true if the file is in HDFS, false if the file is in other file systems.
+ */
+ public boolean isFileInHDFS(FileSystem fs, Path path) throws IOException;
}