You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@sqoop.apache.org by Tim Howe <th...@tripadvisor.com> on 2013/06/13 18:46:48 UTC

incremental import from database in direct mode

Hello all,

I was recently tasked with looking into a problem using Sqoop's
incremental import on our installation, namely that any imports after
the first would report success but the data would never appear.  A
temporary file was created on HDFS with the data but deleted upon
completion rather than being moved into place.

It turned out to be a conflict between the "direct mode" database
manager (for PostgreSQL, in this case) and "incremental mode" import.
Ordinarily Sqoop ends up creating files named part-m-nnnnn where nnnnn
is an incrementing file partition number.  However the direct mode
importer creates files of the form data-nnnnn.  This poses a problem
because AppendUtils, which is used to move files into place at the end
of a direct import, only copies files which match that part-m-nnnnn
format and discards the rest.

I've written a patch which causes direct imports to use the same naming
convention elsewhere.  Attached please also find some changes to
AppendUtils which improve resiliency especially if there happen to be
multiple concurrent operations on the same table.  This patch is against
sqoop-1.3.0-cdh3u3 but seems to apply and build with minimal changes
across the whole 1.x series.

Please let me know if anyone finds this useful or if you have any
further suggestions.  In particular I am curious where the
"part-m-nnnnn" naming comes from and if the "-m" signifies anything.  I
did hunt around in order to find the code which creates those files but
with no luck.

Thanks and regards,
-- 
Tim Howe
Data Warehouse
TripAdvisor

Re: incremental import from database in direct mode

Posted by Jarek Jarcec Cecho <ja...@apache.org>.
Awesome, thank you!

Jarcec

On Thu, Jun 13, 2013 at 02:22:09PM -0400, Tim Howe wrote:
> On 06/13/13 12:52, Jarek Jarcec Cecho wrote:
> > Would you mind creating a JIRA and attaching your patch there?
> 
> https://issues.apache.org/jira/browse/SQOOP-1078
> 
> -- 
> Tim Howe
> Data Warehouse
> TripAdvisor

Re: incremental import from database in direct mode

Posted by Tim Howe <th...@tripadvisor.com>.
On 06/13/13 12:52, Jarek Jarcec Cecho wrote:
> Would you mind creating a JIRA and attaching your patch there?

https://issues.apache.org/jira/browse/SQOOP-1078

-- 
Tim Howe
Data Warehouse
TripAdvisor

Re: incremental import from database in direct mode

Posted by Jarek Jarcec Cecho <ja...@apache.org>.
Hi Tim,
thank you very much for reporting the bug and providing a fix for it, greatly appreciated! Would you mind creating a JIRA [1] and attaching your patch there? Unfortunately due to legal restrictions we can't accept "email" patches.

Jarcec

Links:
1: https://issues.apache.org/jira/browse/SQOOP

On Thu, Jun 13, 2013 at 12:46:48PM -0400, Tim Howe wrote:
> Hello all,
> 
> I was recently tasked with looking into a problem using Sqoop's
> incremental import on our installation, namely that any imports after
> the first would report success but the data would never appear.  A
> temporary file was created on HDFS with the data but deleted upon
> completion rather than being moved into place.
> 
> It turned out to be a conflict between the "direct mode" database
> manager (for PostgreSQL, in this case) and "incremental mode" import.
> Ordinarily Sqoop ends up creating files named part-m-nnnnn where nnnnn
> is an incrementing file partition number.  However the direct mode
> importer creates files of the form data-nnnnn.  This poses a problem
> because AppendUtils, which is used to move files into place at the end
> of a direct import, only copies files which match that part-m-nnnnn
> format and discards the rest.
> 
> I've written a patch which causes direct imports to use the same naming
> convention elsewhere.  Attached please also find some changes to
> AppendUtils which improve resiliency especially if there happen to be
> multiple concurrent operations on the same table.  This patch is against
> sqoop-1.3.0-cdh3u3 but seems to apply and build with minimal changes
> across the whole 1.x series.
> 
> Please let me know if anyone finds this useful or if you have any
> further suggestions.  In particular I am curious where the
> "part-m-nnnnn" naming comes from and if the "-m" signifies anything.  I
> did hunt around in order to find the code which creates those files but
> with no luck.
> 
> Thanks and regards,
> -- 
> Tim Howe
> Data Warehouse
> TripAdvisor

> diff -ru unpack/sqoop-1.3.0-cdh3u3/src/java/com/cloudera/sqoop/util/AppendUtils.java devel/sqoop-1.3.0-cdh3u3/src/java/com/cloudera/sqoop/util/AppendUtils.java
> --- unpack/sqoop-1.3.0-cdh3u3/src/java/com/cloudera/sqoop/util/AppendUtils.java	2012-01-26 13:42:29.000000000 -0500
> +++ devel/sqoop-1.3.0-cdh3u3/src/java/com/cloudera/sqoop/util/AppendUtils.java	2013-06-13 11:31:22.476128082 -0400
> @@ -51,6 +51,8 @@
>    private static final String FILEPART_SEPARATOR = "-";
>    private static final String FILEEXT_SEPARATOR = ".";
>  
> +  private static final Pattern DATA_PART_PATTERN = Pattern.compile("part.*-([0-9]{" + PARTITION_DIGITS + "}+).*");
> +
>    private ImportJobContext context = null;
>  
>    public AppendUtils(ImportJobContext context) {
> @@ -118,11 +120,10 @@
>      int nextPartition = 0;
>      FileStatus[] existingFiles = fs.listStatus(targetDir);
>      if (existingFiles != null && existingFiles.length > 0) {
> -      Pattern patt = Pattern.compile("part.*-([0-9][0-9][0-9][0-9][0-9]).*");
>        for (FileStatus fileStat : existingFiles) {
>          if (!fileStat.isDir()) {
>            String filename = fileStat.getPath().getName();
> -          Matcher mat = patt.matcher(filename);
> +          Matcher mat = DATA_PART_PATTERN.matcher(filename);
>            if (mat.matches()) {
>              int thisPart = Integer.parseInt(mat.group(1));
>              if (thisPart >= nextPartition) {
> @@ -142,52 +143,94 @@
>    }
>  
>    /**
> -   * Move files from source to target using a specified starting partition.
> +   * Move selected files from source to target using a specified starting partition.
> +   *
> +   * Directories are moved without restriction.  Note that the serial
> +   * number of directories bears no relation to the file partition
> +   * numbering.
>     */
>    private void moveFiles(FileSystem fs, Path sourceDir, Path targetDir,
>        int partitionStart) throws IOException {
>  
> -    NumberFormat numpart = NumberFormat.getInstance();
> -    numpart.setMinimumIntegerDigits(PARTITION_DIGITS);
> -    numpart.setGroupingUsed(false);
> -    Pattern patt = Pattern.compile("part.*-([0-9][0-9][0-9][0-9][0-9]).*");
> -    FileStatus[] tempFiles = fs.listStatus(sourceDir);
> +    /* list files in the source dir and check for errors */
> +
> +    FileStatus[] sourceFiles = fs.listStatus(sourceDir);
>  
> -    if (null == tempFiles) {
> +    if (null == sourceFiles) {
>        // If we've already checked that the dir exists, and now it can't be
>        // listed, this is a genuine error (permissions, fs integrity, or other).
>        throw new IOException("Could not list files from " + sourceDir);
>      }
>  
> -    // Move and rename files & directories from temporary to target-dir thus
> -    // appending file's next partition
> -    for (FileStatus fileStat : tempFiles) {
> -      if (!fileStat.isDir()) {
> -        // Move imported data files
> -        String filename = fileStat.getPath().getName();
> -        Matcher mat = patt.matcher(filename);
> -        if (mat.matches()) {
> -          String name = getFilename(filename);
> -          String fileToMove = name.concat(numpart.format(partitionStart++));
> -          String extension = getFileExtension(filename);
> -          if (extension != null) {
> -            fileToMove = fileToMove.concat(extension);
> -          }
> -          LOG.debug("Filename: " + filename + " repartitioned to: "
> -              + fileToMove);
> -          fs.rename(fileStat.getPath(), new Path(targetDir, fileToMove));
> -        }
> -      } else {
> -        // Move directories (_logs & any other)
> -        String dirName = fileStat.getPath().getName();
> -        Path path = new Path(targetDir, dirName);
> -        int dirNumber = 0;
> -        while (fs.exists(path)) {
> -          path = new Path(targetDir, dirName.concat("-").concat(
> -              numpart.format(dirNumber++)));
> +
> +    /* state used throughout the entire move operation */
> +
> +    // pad the data partition number thusly
> +    NumberFormat partFormat = NumberFormat.getInstance();
> +    partFormat.setMinimumIntegerDigits(PARTITION_DIGITS);
> +    partFormat.setGroupingUsed(false);
> +
> +    // where the data partitioning is currently at
> +    int dataPart = partitionStart;
> +
> +
> +    /* loop through all top-level files and copy matching ones */
> +
> +    for (FileStatus fileStatus : sourceFiles) {
> +      String        sourceFilename = fileStatus.getPath().getName();
> +      StringBuilder destFilename   = new StringBuilder();
> +
> +      if (fileStatus.isDir()) {    // move all subdirectories
> +        // pass target dir as initial dest to prevent nesting inside preexisting dir
> +        if (fs.rename(fileStatus.getPath(), targetDir)) {    
> +          LOG.debug("Directory: " + sourceFilename + " renamed to: " + sourceFilename);
> +        } else {
> +          int dirNumber = 0;
> +          Path destPath;
> +          do {
> +            // clear the builder in case this isn't the first iteration
> +            destFilename.setLength(0);
> +
> +            // name-nnnnn?
> +            destFilename
> +              .append(sourceFilename)
> +              .append("-")
> +              .append(partFormat.format(dirNumber++));
> +
> +            destPath = new Path(targetDir, destFilename.toString());
> +            if (fs.exists(destPath))
> +              continue;
> +
> +            /*
> +             * There's still a race condition right here if an
> +             * identically-named directory is created concurrently.
> +             * It can be avoided by creating a parent dir for all
> +             * migrated dirs, or by an intermediate rename.
> +             */
> +
> +          } while (!fs.rename(fileStatus.getPath(), destPath));
> +
> +          LOG.debug("Directory: " + sourceFilename + " renamed to: " + destPath.getName());
>          }
> -        LOG.debug("Directory: " + dirName + " renamed to: " + path.getName());
> -        fs.rename(fileStat.getPath(), path);
> +      } else if (DATA_PART_PATTERN.matcher(sourceFilename).matches()) {    // move only matching top-level files
> +        do {
> +          // clear the builder in case this isn't the first iteration
> +          destFilename.setLength(0);
> +
> +          // name-nnnnn
> +          destFilename
> +            .append(getFilename(sourceFilename))
> +            .append(partFormat.format(dataPart++));
> +
> +          // .ext?
> +          String extension = getFileExtension(sourceFilename);
> +          if (extension != null)
> +            destFilename.append(getFileExtension(sourceFilename));
> +        } while (!fs.rename(fileStatus.getPath(), new Path(targetDir, destFilename.toString())));
> +
> +        LOG.debug("Filename: " + sourceFilename + " repartitioned to: " + destFilename.toString());
> +      } else {    // ignore everything else
> +        LOG.debug("Filename: " + sourceFilename + " ignored");
>        }
>      }
>    }
> diff -ru unpack/sqoop-1.3.0-cdh3u3/src/java/com/cloudera/sqoop/util/DirectImportUtils.java devel/sqoop-1.3.0-cdh3u3/src/java/com/cloudera/sqoop/util/DirectImportUtils.java
> --- unpack/sqoop-1.3.0-cdh3u3/src/java/com/cloudera/sqoop/util/DirectImportUtils.java	2012-01-26 13:42:29.000000000 -0500
> +++ devel/sqoop-1.3.0-cdh3u3/src/java/com/cloudera/sqoop/util/DirectImportUtils.java	2013-06-13 11:31:22.475128082 -0400
> @@ -88,7 +88,7 @@
>  
>      // This Writer will be closed by the caller.
>      return new SplittableBufferedWriter(
> -        new SplittingOutputStream(conf, destDir, "data-",
> +        new SplittingOutputStream(conf, destDir, "part-m-",
>          options.getDirectSplitSize(), getCodec(conf, options)));
>    }
>