You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ja...@apache.org on 2012/08/20 08:21:35 UTC

svn commit: r1374923 - in /sqoop/trunk/src: docs/user/ java/org/apache/sqoop/manager/ java/org/apache/sqoop/mapreduce/ java/org/apache/sqoop/util/ test/com/cloudera/sqoop/manager/

Author: jarcec
Date: Mon Aug 20 06:21:34 2012
New Revision: 1374923

URL: http://svn.apache.org/viewvc?rev=1374923&view=rev
Log:
SQOOP-390. PostgreSQL connector for direct export with pg_bulkload.

(Masatake Iwasaki via Jarek Jarcec Cecho)

Added:
    sqoop/trunk/src/docs/user/connectors.txt
    sqoop/trunk/src/java/org/apache/sqoop/manager/PGBulkloadManager.java
    sqoop/trunk/src/java/org/apache/sqoop/mapreduce/AutoProgressReducer.java
    sqoop/trunk/src/java/org/apache/sqoop/mapreduce/PGBulkloadExportJob.java
    sqoop/trunk/src/java/org/apache/sqoop/mapreduce/PGBulkloadExportMapper.java
    sqoop/trunk/src/java/org/apache/sqoop/mapreduce/PGBulkloadExportReducer.java
    sqoop/trunk/src/java/org/apache/sqoop/mapreduce/ProgressThread.java
    sqoop/trunk/src/java/org/apache/sqoop/mapreduce/SqoopReducer.java
    sqoop/trunk/src/java/org/apache/sqoop/util/PostgreSQLUtils.java
    sqoop/trunk/src/test/com/cloudera/sqoop/manager/PGBulkloadManagerManualTest.java
Modified:
    sqoop/trunk/src/docs/user/SqoopUserGuide.txt
    sqoop/trunk/src/java/org/apache/sqoop/manager/DirectPostgresqlManager.java
    sqoop/trunk/src/java/org/apache/sqoop/mapreduce/AutoProgressMapper.java

Modified: sqoop/trunk/src/docs/user/SqoopUserGuide.txt
URL: http://svn.apache.org/viewvc/sqoop/trunk/src/docs/user/SqoopUserGuide.txt?rev=1374923&r1=1374922&r2=1374923&view=diff
==============================================================================
--- sqoop/trunk/src/docs/user/SqoopUserGuide.txt (original)
+++ sqoop/trunk/src/docs/user/SqoopUserGuide.txt Mon Aug 20 06:21:34 2012
@@ -72,8 +72,9 @@ include::version.txt[]
 
 include::compatibility.txt[]
 
+include::connectors.txt[]
+
 include::support.txt[]
 
 include::troubleshooting.txt[]
 
-

Added: sqoop/trunk/src/docs/user/connectors.txt
URL: http://svn.apache.org/viewvc/sqoop/trunk/src/docs/user/connectors.txt?rev=1374923&view=auto
==============================================================================
--- sqoop/trunk/src/docs/user/connectors.txt (added)
+++ sqoop/trunk/src/docs/user/connectors.txt Mon Aug 20 06:21:34 2012
@@ -0,0 +1,138 @@
+
+////
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+////
+
+
+Notes for specific connectors
+-----------------------------
+
+pg_bulkload connector
+~~~~~~~~~~~~~~~~~~~~~
+
+Purpose
+^^^^^^^
+pg_bulkload connector is a direct connector for exporting data into PostgreSQL.
+This connector uses
+http://pgbulkload.projects.postgresql.org/index.html[pg_bulkload].
+Users benefit from functionality of pg_bulkload such as
+fast exports bypassing shared bufferes and WAL,
+flexible error records handling,
+and ETL feature with filter functions.
+
+Requirements
+^^^^^^^^^^^^
+pg_bulkload connector requires following conditions for export job execution:
+
+* The link:http://pgbulkload.projects.postgresql.org/index.html[pg_bulkload]
+  must be installed on DB server and all slave nodes.
+  RPM for RedHat or CentOS is available in then
+  link:http://pgfoundry.org/frs/?group_id=1000261[download page].
+* The link:http://jdbc.postgresql.org/index.html[PostgreSQL JDBC]
+  is required on client node.
+* Superuser role of PostgreSQL database is required for execution of pg_bulkload.
+
+Syntax
+^^^^^^
+Use +--connection-manager+ option to specify connection manager classname.
+----
+$ sqoop export (generic-args) --connection-manager org.apache.sqoop.manager.PGBulkloadManager (export-args)
+$ sqoop-export (generic-args) --connection-manager org.apache.sqoop.manager.PGBulkloadManager (export-args)
+----
+
+This connector supports export arguments shown below.
+
+.Supported export control arguments:
+[grid="all"]
+`----------------------------------------`---------------------------------------
+Argument                                 Description
+---------------------------------------------------------------------------------
++\--export-dir <dir>+                    HDFS source path for the export
++-m,\--num-mappers <n>+                  Use 'n' map tasks to export in\
+                                         parallel
++\--table <table-name>+                  Table to populate
++\--input-null-string <null-string>+     The string to be interpreted as\
+                                         null for string columns
++\--clear-staging-table+                 Indicates that any data present in\
+                                         the staging table can be deleted.
+---------------------------------------------------------------------------------
+
+There are additional configuration for pg_bulkload execution
+specified via Hadoop Configuration properties
+which can be given with +-D <property=value>+ option.
+Because Hadoop Configuration properties are generic arguments of the sqoop,
+it must preceed any export control arguments.
+
+.Supported export control properties:
+[grid="all"]
+`-----------------------------`----------------------------------------------
+Property                      Description
+-----------------------------------------------------------------------------
+mapred.reduce.tasks           Number of reduce tasks for staging. \
+                              The defalt value is 1. \
+                              Each tasks do staging in a single transaction.
+pgbulkload.bin                Path of the pg_bulkoad binary \
+                              installed on each slave nodes.
+pgbulkload.check.constraints  Specify whether CHECK constraints are checked \
+                              during the loading. \
+                              The default value is YES.
+pgbulkload.parse.errors       The maximum mumber of ingored records \
+                              that cause errors during parsing, \
+                              encoding, filtering, constraints checking, \
+                              and data type conversion. \
+                              Error records are recorded \
+                              in the PARSE BADFILE.  \
+                              The default value is INFINITE.
+pgbulkload.duplicate.errors   Number of ingored records \
+                              that violate unique constraints. \
+                              Duplicated records are recorded in the \
+                              DUPLICATE BADFILE on DB server. \
+                              The default value is INFINITE.
+pgbulkload.filter             Specify the filter function \
+                              to convert each row in the input file.  \
+                              See the pg_bulkload documentation to know \
+                              how to write FILTER functions.
+-----------------------------------------------------------------------------
+
+Here is a example of complete command line.
+----
+$ sqoop export \
+    -Dmapred.reduce.tasks=2
+    -Dpgbulkload.bin="/usr/local/bin/pg_bulkload" \
+    -Dpgbulkload.input.field.delim=$'\t' \
+    -Dpgbulkload.check.constraints="YES" \
+    -Dpgbulkload.parse.errors="INFINITE" \
+    -Dpgbulkload.duplicate.errors="INFINITE" \
+    --connect jdbc:postgresql://pgsql.example.net:5432/sqooptest \
+    --connection-manager org.apache.sqoop.manager.PGBulkloadManager \
+    --table test --username sqooptest --export-dir=/test -m 2
+----
+
+Data Staging
+^^^^^^^^^^^^
+Each map tasks of pg_bulkload connector's export job create
+their own staging table on the fly.
+The Name of staging tables is decided based on the destination table
+and the task attempt ids.
+For example, the name of staging table for the "test" table is like
++test_attempt_1345021837431_0001_m_000000_0+ .
+
+Staging tables are automatically dropped if tasks successfully complete
+or map tasks fail.
+When reduce task fails,
+staging table for the task are left for manual retry and
+users must take care of it.

Modified: sqoop/trunk/src/java/org/apache/sqoop/manager/DirectPostgresqlManager.java
URL: http://svn.apache.org/viewvc/sqoop/trunk/src/java/org/apache/sqoop/manager/DirectPostgresqlManager.java?rev=1374923&r1=1374922&r2=1374923&view=diff
==============================================================================
--- sqoop/trunk/src/java/org/apache/sqoop/manager/DirectPostgresqlManager.java (original)
+++ sqoop/trunk/src/java/org/apache/sqoop/manager/DirectPostgresqlManager.java Mon Aug 20 06:21:34 2012
@@ -32,6 +32,7 @@ import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.sqoop.util.PostgreSQLUtils;
 
 import com.cloudera.sqoop.SqoopOptions;
 import com.cloudera.sqoop.io.SplittableBufferedWriter;
@@ -293,26 +294,6 @@ public class DirectPostgresqlManager
     return tempFile.toString();
   }
 
-  /** Write the user's password to a file that is chmod 0600.
-      @return the filename.
-    */
-  private String writePasswordFile(String password) throws IOException {
-
-    String tmpDir = options.getTempDir();
-    File tempFile = File.createTempFile("pgpass", ".pgpass", new File(tmpDir));
-    LOG.debug("Writing password to tempfile: " + tempFile);
-
-    // Make sure it's only readable by the current user.
-    DirectImportUtils.setFilePermissions(tempFile, "0600");
-
-    // Actually write the password data into the file.
-    BufferedWriter w = new BufferedWriter(
-        new OutputStreamWriter(new FileOutputStream(tempFile)));
-    w.write("*:*:*:*:" + password);
-    w.close();
-    return tempFile.toString();
-  }
-
   // TODO(aaron): Refactor this method to be much shorter.
   // CHECKSTYLE:OFF
   @Override
@@ -380,7 +361,8 @@ public class DirectPostgresqlManager
         args.add(username);
         String password = options.getPassword();
         if (null != password) {
-          passwordFilename = writePasswordFile(password);
+          passwordFilename =
+            PostgreSQLUtils.writePasswordFile(options.getTempDir(), password);
           // Need to send PGPASSFILE environment variable specifying
           // location of our postgres file.
           envp.add("PGPASSFILE=" + passwordFilename);

Added: sqoop/trunk/src/java/org/apache/sqoop/manager/PGBulkloadManager.java
URL: http://svn.apache.org/viewvc/sqoop/trunk/src/java/org/apache/sqoop/manager/PGBulkloadManager.java?rev=1374923&view=auto
==============================================================================
--- sqoop/trunk/src/java/org/apache/sqoop/manager/PGBulkloadManager.java (added)
+++ sqoop/trunk/src/java/org/apache/sqoop/manager/PGBulkloadManager.java Mon Aug 20 06:21:34 2012
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.manager;
+
+import java.io.IOException;
+import com.cloudera.sqoop.SqoopOptions;
+import com.cloudera.sqoop.manager.ExportJobContext;
+import com.cloudera.sqoop.util.ExportException;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sqoop.mapreduce.ExportInputFormat;
+import org.apache.sqoop.mapreduce.PGBulkloadExportJob;
+
+
+
+/**
+ * Manages connections to Postgresql databases.
+ */
+public class PGBulkloadManager extends PostgresqlManager {
+
+  public static final Log LOG =
+      LogFactory.getLog(PGBulkloadManager.class.getName());
+
+
+  public PGBulkloadManager(final SqoopOptions opts) {
+    super(opts, true);
+  }
+
+
+  @Override
+  public void exportTable(ExportJobContext context)
+      throws IOException, ExportException {
+    context.setConnManager(this);
+    options.setStagingTableName(null);
+    PGBulkloadExportJob jobbase =
+        new PGBulkloadExportJob(context,
+                                null,
+                                ExportInputFormat.class,
+                                NullOutputFormat.class);
+    jobbase.runExport();
+  }
+
+
+  @Override
+  public boolean supportsStagingForExport() {
+    return false;
+  }
+
+}

Modified: sqoop/trunk/src/java/org/apache/sqoop/mapreduce/AutoProgressMapper.java
URL: http://svn.apache.org/viewvc/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/AutoProgressMapper.java?rev=1374923&r1=1374922&r2=1374923&view=diff
==============================================================================
--- sqoop/trunk/src/java/org/apache/sqoop/mapreduce/AutoProgressMapper.java (original)
+++ sqoop/trunk/src/java/org/apache/sqoop/mapreduce/AutoProgressMapper.java Mon Aug 20 06:21:34 2012
@@ -21,7 +21,6 @@ package org.apache.sqoop.mapreduce;
 import java.io.IOException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
 
 /**
  * Identity mapper that continuously reports progress via a background thread.
@@ -32,25 +31,6 @@ public class AutoProgressMapper<KEYIN, V
   public static final Log LOG = LogFactory.getLog(
       AutoProgressMapper.class.getName());
 
-  /**
-   * Total number of millis for which progress will be reported by the
-   * auto-progress thread. If this is zero, then the auto-progress thread will
-   * never voluntarily exit.
-   */
-  private int maxProgressPeriod;
-
-  /**
-   * Number of milliseconds to sleep for between loop iterations. Must be less
-   * than report interval.
-   */
-  private int sleepInterval;
-
-  /**
-   * Number of milliseconds between calls to Reporter.progress().
-   * Should be a multiple of the sleepInterval.
-   */
-  private int reportInterval;
-
   public static final String MAX_PROGRESS_PERIOD_KEY =
       "sqoop.mapred.auto.progress.max";
   public static final String SLEEP_INTERVAL_KEY =
@@ -67,112 +47,14 @@ public class AutoProgressMapper<KEYIN, V
   // Disable max progress, by default.
   public static final int DEFAULT_MAX_PROGRESS = 0;
 
-  private class ProgressThread extends Thread {
-
-    private volatile boolean keepGoing; // While this is true, thread runs.
-
-    private Context context;
-    private long startTimeMillis;
-    private long lastReportMillis;
-
-    public ProgressThread(final Context ctxt) {
-      this.context = ctxt;
-      this.keepGoing = true;
-    }
-
-    public void signalShutdown() {
-      this.keepGoing = false; // volatile update.
-      this.interrupt();
-    }
-
-    public void run() {
-      this.lastReportMillis = System.currentTimeMillis();
-      this.startTimeMillis = this.lastReportMillis;
-
-      final long MAX_PROGRESS = AutoProgressMapper.this.maxProgressPeriod;
-      final long REPORT_INTERVAL = AutoProgressMapper.this.reportInterval;
-      final long SLEEP_INTERVAL = AutoProgressMapper.this.sleepInterval;
-
-      // In a loop:
-      //   * Check that we haven't run for too long (maxProgressPeriod).
-      //   * If it's been a report interval since we last made progress,
-      //     make more.
-      //   * Sleep for a bit.
-      //   * If the parent thread has signaled for exit, do so.
-      while (this.keepGoing) {
-        long curTimeMillis = System.currentTimeMillis();
-
-        if (MAX_PROGRESS != 0
-            && curTimeMillis - this.startTimeMillis > MAX_PROGRESS) {
-          this.keepGoing = false;
-          LOG.info("Auto-progress thread exiting after " + MAX_PROGRESS
-              + " ms.");
-          break;
-        }
-
-        if (curTimeMillis - this.lastReportMillis > REPORT_INTERVAL) {
-          // It's been a full report interval -- claim progress.
-          LOG.debug("Auto-progress thread reporting progress");
-          this.context.progress();
-          this.lastReportMillis = curTimeMillis;
-        }
-
-        // Unless we got an interrupt while we were working,
-        // sleep a bit before doing more work.
-        if (!Thread.interrupted()) {
-          try {
-            Thread.sleep(SLEEP_INTERVAL);
-          } catch (InterruptedException ie) {
-            // we were notified on something; not necessarily an error.
-          }
-        }
-      }
-
-      LOG.info("Auto-progress thread is finished. keepGoing=" + this.keepGoing);
-    }
-  }
-
-  /**
-   * Set configuration parameters for the auto-progress thread.
-   */
-  private void configureAutoProgress(Configuration job) {
-    this.maxProgressPeriod = job.getInt(MAX_PROGRESS_PERIOD_KEY,
-        DEFAULT_MAX_PROGRESS);
-    this.sleepInterval = job.getInt(SLEEP_INTERVAL_KEY,
-        DEFAULT_SLEEP_INTERVAL);
-    this.reportInterval = job.getInt(REPORT_INTERVAL_KEY,
-        DEFAULT_REPORT_INTERVAL);
-
-    if (this.reportInterval < 1) {
-      LOG.warn("Invalid " + REPORT_INTERVAL_KEY + "; setting to "
-          + DEFAULT_REPORT_INTERVAL);
-      this.reportInterval = DEFAULT_REPORT_INTERVAL;
-    }
-
-    if (this.sleepInterval > this.reportInterval || this.sleepInterval < 1) {
-      LOG.warn("Invalid " + SLEEP_INTERVAL_KEY + "; setting to "
-          + DEFAULT_SLEEP_INTERVAL);
-      this.sleepInterval = DEFAULT_SLEEP_INTERVAL;
-    }
-
-    if (this.maxProgressPeriod < 0) {
-      LOG.warn("Invalid " + MAX_PROGRESS_PERIOD_KEY + "; setting to "
-          + DEFAULT_MAX_PROGRESS);
-      this.maxProgressPeriod = DEFAULT_MAX_PROGRESS;
-    }
-  }
-
-
   // map() method intentionally omitted; Mapper.map() is the identity mapper.
 
-
   /**
    * Run the mapping process for this task, wrapped in an auto-progress system.
    */
   @Override
   public void run(Context context) throws IOException, InterruptedException {
-    configureAutoProgress(context.getConfiguration());
-    ProgressThread thread = this.new ProgressThread(context);
+    ProgressThread thread = new ProgressThread(context, LOG);
 
     try {
       thread.setDaemon(true);
@@ -191,7 +73,7 @@ public class AutoProgressMapper<KEYIN, V
         LOG.debug("Progress thread shutdown detected.");
       } catch (InterruptedException ie) {
         LOG.warn("Interrupted when waiting on auto-progress thread: "
-            + ie.toString());
+            + ie.toString(), ie);
       }
     }
   }

Added: sqoop/trunk/src/java/org/apache/sqoop/mapreduce/AutoProgressReducer.java
URL: http://svn.apache.org/viewvc/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/AutoProgressReducer.java?rev=1374923&view=auto
==============================================================================
--- sqoop/trunk/src/java/org/apache/sqoop/mapreduce/AutoProgressReducer.java (added)
+++ sqoop/trunk/src/java/org/apache/sqoop/mapreduce/AutoProgressReducer.java Mon Aug 20 06:21:34 2012
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import java.io.IOException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Identity reducer that continuously reports progress via a background thread.
+ */
+public class AutoProgressReducer<KEYIN, VALIN, KEYOUT, VALOUT>
+    extends SqoopReducer<KEYIN, VALIN, KEYOUT, VALOUT> {
+
+  public static final Log LOG = LogFactory.getLog(
+      AutoProgressReducer.class.getName());
+
+  // reduce() method intentionally omitted;
+  // Reducer.reduce() is the identity reducer.
+
+  /**
+   * Run the mapping process for this task, wrapped in an auto-progress system.
+   */
+  @Override
+  public void run(Context context) throws IOException, InterruptedException {
+    ProgressThread thread = new ProgressThread(context, LOG);
+
+    try {
+      thread.setDaemon(true);
+      thread.start();
+
+      // use default run() method to actually drive the mapping.
+      super.run(context);
+    } finally {
+      // Tell the progress thread to exit..
+      LOG.debug("Instructing auto-progress thread to quit.");
+      thread.signalShutdown();
+      try {
+        // And wait for that to happen.
+        LOG.debug("Waiting for progress thread shutdown...");
+        thread.join();
+        LOG.debug("Progress thread shutdown detected.");
+      } catch (InterruptedException ie) {
+        LOG.warn("Interrupted when waiting on auto-progress thread: "
+            + ie.toString(), ie);
+      }
+    }
+  }
+}

Added: sqoop/trunk/src/java/org/apache/sqoop/mapreduce/PGBulkloadExportJob.java
URL: http://svn.apache.org/viewvc/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/PGBulkloadExportJob.java?rev=1374923&view=auto
==============================================================================
--- sqoop/trunk/src/java/org/apache/sqoop/mapreduce/PGBulkloadExportJob.java (added)
+++ sqoop/trunk/src/java/org/apache/sqoop/mapreduce/PGBulkloadExportJob.java Mon Aug 20 06:21:34 2012
@@ -0,0 +1,207 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import java.io.IOException;
+import com.cloudera.sqoop.manager.ExportJobContext;
+import com.cloudera.sqoop.util.ExportException;
+import com.cloudera.sqoop.SqoopOptions;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.sqoop.config.ConfigurationHelper;
+import org.apache.sqoop.lib.DelimiterSet;
+import org.apache.sqoop.manager.ConnManager;
+import org.apache.sqoop.mapreduce.db.DBConfiguration;
+import org.apache.sqoop.orm.TableClassName;
+
+
+/**
+ * Class that runs an export job using pg_bulkload in the mapper.
+ */
+public class PGBulkloadExportJob extends ExportJobBase {
+
+  public static final Log LOG =
+      LogFactory.getLog(PGBulkloadExportJob.class.getName());
+
+
+  public PGBulkloadExportJob(final ExportJobContext context) {
+    super(context);
+  }
+
+
+  public PGBulkloadExportJob(final ExportJobContext ctxt,
+      final Class<? extends Mapper> mapperClass,
+      final Class<? extends InputFormat> inputFormatClass,
+      final Class<? extends OutputFormat> outputFormatClass) {
+    super(ctxt, mapperClass, inputFormatClass, outputFormatClass);
+  }
+
+
+  @Override
+  protected void configureInputFormat(Job job, String tableName,
+                                      String tableClassName, String splitByCol)
+    throws ClassNotFoundException, IOException {
+    super.configureInputFormat(job, tableName, tableClassName, splitByCol);
+    ConnManager mgr = context.getConnManager();
+    String username = options.getUsername();
+    if (null == username || username.length() == 0) {
+      DBConfiguration.configureDB(job.getConfiguration(),
+                                  mgr.getDriverClass(),
+                                  options.getConnectString(),
+                                  options.getFetchSize());
+    } else {
+      DBConfiguration.configureDB(job.getConfiguration(),
+                                  mgr.getDriverClass(),
+                                  options.getConnectString(),
+                                  username, options.getPassword(),
+                                  options.getFetchSize());
+    }
+  }
+
+
+  @Override
+  protected Class<? extends Mapper> getMapperClass() {
+    return PGBulkloadExportMapper.class;
+  }
+
+
+  protected Class<? extends Reducer> getReducerClass() {
+    return PGBulkloadExportReducer.class;
+  }
+
+
+  private void setDelimiter(String prop, char val, Configuration conf) {
+    switch (val) {
+    case DelimiterSet.NULL_CHAR:
+      break;
+    case '\t':
+    default:
+      conf.set(prop, String.valueOf(val));
+    }
+  }
+
+
+  @Override
+  protected void propagateOptionsToJob(Job job) {
+    super.propagateOptionsToJob(job);
+    SqoopOptions opts = context.getOptions();
+    Configuration conf = job.getConfiguration();
+    conf.setIfUnset("pgbulkload.bin", "pg_bulkload");
+    if (opts.getNullStringValue() != null) {
+      conf.set("pgbulkload.null.string", opts.getNullStringValue());
+    }
+    setDelimiter("pgbulkload.input.field.delim",
+                 opts.getInputFieldDelim(),
+                 conf);
+    setDelimiter("pgbulkload.input.record.delim",
+                 opts.getInputRecordDelim(),
+                 conf);
+    setDelimiter("pgbulkload.input.enclosedby",
+                 opts.getInputEnclosedBy(),
+                 conf);
+    setDelimiter("pgbulkload.input.escapedby",
+                 opts.getInputEscapedBy(),
+                 conf);
+    conf.setBoolean("pgbulkload.input.encloserequired",
+                    opts.isInputEncloseRequired());
+    conf.setIfUnset("pgbulkload.check.constraints", "YES");
+    conf.setIfUnset("pgbulkload.parse.errors", "INFINITE");
+    conf.setIfUnset("pgbulkload.duplicate.errors", "INFINITE");
+    conf.set("mapred.jar", context.getJarFile());
+    conf.setBoolean("mapred.map.tasks.speculative.execution", false);
+    conf.setBoolean("mapred.reduce.tasks.speculative.execution", false);
+    conf.setInt("mapred.map.max.attempts", 1);
+    conf.setInt("mapred.reduce.max.attempts", 1);
+    conf.setIfUnset("mapred.reduce.tasks",  "1");
+    if (context.getOptions().doClearStagingTable()) {
+      conf.setBoolean("pgbulkload.clear.staging.table", true);
+    }
+  }
+
+
+  @Override
+  public void runExport() throws ExportException, IOException {
+    ConnManager cmgr = context.getConnManager();
+    SqoopOptions options = context.getOptions();
+    Configuration conf = options.getConf();
+    DBConfiguration dbConf = null;
+    String outputTableName = context.getTableName();
+    String tableName = outputTableName;
+    String tableClassName =
+        new TableClassName(options).getClassForTable(outputTableName);
+
+    LOG.info("Beginning export of " + outputTableName);
+    loadJars(conf, context.getJarFile(), tableClassName);
+
+    try {
+      Job job = new Job(conf);
+      dbConf = new DBConfiguration(job.getConfiguration());
+      dbConf.setOutputTableName(tableName);
+      configureInputFormat(job, tableName, tableClassName, null);
+      configureOutputFormat(job, tableName, tableClassName);
+      configureNumTasks(job);
+      propagateOptionsToJob(job);
+      job.setMapperClass(getMapperClass());
+      job.setMapOutputKeyClass(LongWritable.class);
+      job.setMapOutputValueClass(Text.class);
+      job.setReducerClass(getReducerClass());
+      cacheJars(job, context.getConnManager());
+      setJob(job);
+
+      boolean success = runJob(job);
+      if (!success) {
+        throw new ExportException("Export job failed!");
+      }
+    } catch (InterruptedException ie) {
+      throw new IOException(ie);
+    } catch (ClassNotFoundException cnfe) {
+      throw new IOException(cnfe);
+    } finally {
+      unloadJars();
+    }
+  }
+
+
+  @Override
+  protected int configureNumTasks(Job job) throws IOException {
+    SqoopOptions options = context.getOptions();
+    int numMapTasks = options.getNumMappers();
+    if (numMapTasks < 1) {
+      numMapTasks = SqoopOptions.DEFAULT_NUM_MAPPERS;
+      LOG.warn("Invalid mapper count; using " + numMapTasks + " mappers.");
+    }
+
+    ConfigurationHelper.setJobNumMaps(job, numMapTasks);
+    return numMapTasks;
+  }
+
+
+  private void clearStagingTable(DBConfiguration dbConf, String tableName)
+    throws IOException {
+    // clearing stagingtable is done each mapper tasks
+  }
+}

Added: sqoop/trunk/src/java/org/apache/sqoop/mapreduce/PGBulkloadExportMapper.java
URL: http://svn.apache.org/viewvc/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/PGBulkloadExportMapper.java?rev=1374923&view=auto
==============================================================================
--- sqoop/trunk/src/java/org/apache/sqoop/mapreduce/PGBulkloadExportMapper.java (added)
+++ sqoop/trunk/src/java/org/apache/sqoop/mapreduce/PGBulkloadExportMapper.java Mon Aug 20 06:21:34 2012
@@ -0,0 +1,290 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.sqoop.lib.SqoopRecord;
+import org.apache.hadoop.mapreduce.Mapper.Context;
+import org.apache.sqoop.mapreduce.db.DBConfiguration;
+import org.apache.sqoop.util.PostgreSQLUtils;
+import org.apache.sqoop.util.Executor;
+import org.apache.sqoop.util.JdbcUrl;
+
+
+/**
+ * Mapper that starts a 'pg_bulkload' process and uses that to export rows from
+ * HDFS to a PostgreSQL database at high speed.
+ *
+ * map() methods are actually provided by subclasses that read from
+ * SequenceFiles (containing existing SqoopRecords) or text files
+ * (containing delimited lines) and deliver these results to the stream
+ * used to interface with pg_bulkload.
+ */
+public class PGBulkloadExportMapper
+    extends AutoProgressMapper<LongWritable, Writable, LongWritable, Text> {
+  private Configuration conf;
+  private DBConfiguration dbConf;
+  private Process process;
+  private OutputStream out;
+  protected BufferedWriter writer;
+  private Thread thread;
+  protected String tmpTableName;
+  private String tableName;
+  private String passwordFilename;
+
+
+  public PGBulkloadExportMapper() {
+  }
+
+
+  protected void setup(Context context)
+      throws IOException, InterruptedException {
+    super.setup(context);
+    conf = context.getConfiguration();
+    dbConf = new DBConfiguration(conf);
+    tableName = dbConf.getOutputTableName();
+    tmpTableName = tableName + "_" + context.getTaskAttemptID().toString();
+
+    Connection conn = null;
+    try {
+      conn = dbConf.getConnection();
+      conn.setAutoCommit(false);
+      if (conf.getBoolean("pgbulkload.clear.staging.table", false)) {
+        StringBuffer query = new StringBuffer();
+        query.append("DROP TABLE IF EXISTS ");
+        query.append(tmpTableName);
+        doExecuteUpdate(query.toString());
+      }
+      StringBuffer query = new StringBuffer();
+      query.append("CREATE TABLE ");
+      query.append(tmpTableName);
+      query.append("(LIKE ");
+      query.append(tableName);
+      query.append(" INCLUDING CONSTRAINTS)");
+      if (conf.get("pgbulkload.staging.tablespace") != null) {
+        query.append("TABLESPACE ");
+        query.append(conf.get("pgbulkload.staging.tablespace"));
+      }
+      doExecuteUpdate(query.toString());
+      conn.commit();
+    } catch (ClassNotFoundException ex) {
+      LOG.error("Unable to load JDBC driver class", ex);
+      throw new IOException(ex);
+    } catch (SQLException ex) {
+      LOG.error("Unable to execute statement", ex);
+      throw new IOException(ex);
+    } finally {
+      try {
+        conn.close();
+      } catch (SQLException ex) {
+        LOG.error("Unable to close connection", ex);
+      }
+    }
+
+    try {
+      ArrayList<String> args = new ArrayList<String>();
+      List<String> envp = Executor.getCurEnvpStrings();
+      args.add(conf.get("pgbulkload.bin", "pg_bulkload"));
+      args.add("--username="
+          + conf.get(DBConfiguration.USERNAME_PROPERTY));
+      args.add("--dbname="
+          + JdbcUrl.getDatabaseName(conf.get(DBConfiguration.URL_PROPERTY)));
+      args.add("--host="
+          + JdbcUrl.getHostName(conf.get(DBConfiguration.URL_PROPERTY)));
+      args.add("--port="
+          + JdbcUrl.getPort(conf.get(DBConfiguration.URL_PROPERTY)));
+      args.add("--input=stdin");
+      args.add("--output=" + tmpTableName);
+      args.add("-o");
+      args.add("TYPE=CSV");
+      args.add("-o");
+      args.add("DELIMITER=" + conf.get("pgbulkload.input.field.delim", ","));
+      args.add("-o");
+      args.add("QUOTE=" + conf.get("pgbulkload.input.enclosedby", "\""));
+      args.add("-o");
+      args.add("ESCAPE=" + conf.get("pgbulkload.input.escapedby", "\""));
+      args.add("-o");
+      args.add("CHECK_CONSTRAINTS=" + conf.get("pgbulkload.check.constraints"));
+      args.add("-o");
+      args.add("PARSE_ERRORS=" + conf.get("pgbulkload.parse.errors"));
+      args.add("-o");
+      args.add("DUPLICATE_ERRORS=" + conf.get("pgbulkload.duplicate.errors"));
+      if (conf.get("pgbulkload.null.string") != null) {
+        args.add("-o");
+        args.add("NULL=" + conf.get("pgbulkload.null.string"));
+      }
+      if (conf.get("pgbulkload.filter") != null) {
+        args.add("-o");
+        args.add("FILTER=" + conf.get("pgbulkload.filter"));
+      }
+      LOG.debug("Starting pg_bulkload with arguments:");
+      for (String arg : args) {
+        LOG.debug("  " + arg);
+      }
+      if (conf.get(DBConfiguration.PASSWORD_PROPERTY) != null) {
+        String tmpDir = System.getProperty("test.build.data", "/tmp/");
+        if (!tmpDir.endsWith(File.separator)) {
+          tmpDir = tmpDir + File.separator;
+        }
+        tmpDir = conf.get("job.local.dir", tmpDir);
+        passwordFilename = PostgreSQLUtils.writePasswordFile(tmpDir,
+            conf.get(DBConfiguration.PASSWORD_PROPERTY));
+        envp.add("PGPASSFILE=" + passwordFilename);
+      }
+      process = Runtime.getRuntime().exec(args.toArray(new String[0]),
+                                          envp.toArray(new String[0]));
+      out = process.getOutputStream();
+      writer = new BufferedWriter(new OutputStreamWriter(out));
+      thread = new ReadThread(process.getErrorStream());
+      thread.start();
+    } catch (Exception e) {
+      cleanup(context);
+      doExecuteUpdate("DROP TABLE " + tmpTableName);
+      throw new IOException(e);
+    }
+  }
+
+
+  public void map(LongWritable key, Writable value, Context context)
+    throws IOException, InterruptedException {
+    try {
+      String str = value.toString();
+      if (value instanceof Text) {
+        writer.write(str, 0, str.length());
+        writer.newLine();
+      } else if (value instanceof SqoopRecord) {
+        writer.write(str, 0, str.length());
+      }
+    } catch (Exception e) {
+      doExecuteUpdate("DROP TABLE " + tmpTableName);
+      cleanup(context);
+      throw new IOException(e);
+    }
+  }
+
+
+  protected void cleanup(Context context)
+    throws IOException, InterruptedException {
+    LongWritable taskid =
+      new LongWritable(context.getTaskAttemptID().getTaskID().getId());
+    context.write(taskid, new Text(tmpTableName));
+    writer.close();
+    out.close();
+    try {
+      thread.join();
+    } finally {
+      // block until the process is done.
+      int result = 0;
+      if (null != process) {
+        while (true) {
+          try {
+            result = process.waitFor();
+          } catch (InterruptedException ie) {
+            // interrupted; loop around.
+            continue;
+          }
+          break;
+        }
+      }
+    }
+    if (null != passwordFilename) {
+      if (!new File(passwordFilename).delete()) {
+        LOG.error("Could not remove postgresql password file "
+                  + passwordFilename);
+        LOG.error("You should remove this file to protect your credentials.");
+      }
+    }
+  }
+
+
+  protected int doExecuteUpdate(String query) throws IOException {
+    Connection conn = null;
+    try {
+      conn = dbConf.getConnection();
+      conn.setAutoCommit(false);
+    } catch (ClassNotFoundException ex) {
+      LOG.error("Unable to load JDBC driver class", ex);
+      throw new IOException(ex);
+    } catch (SQLException ex) {
+      LOG.error("Unable to connect to database", ex);
+      throw new IOException(ex);
+    }
+    Statement stmt = null;
+    try {
+      stmt = conn.createStatement();
+      int ret = stmt.executeUpdate(query);
+      conn.commit();
+      return ret;
+    } catch (SQLException ex) {
+      LOG.error("Unable to execute query: "  + query, ex);
+      throw new IOException(ex);
+    } finally {
+      if (stmt != null) {
+        try {
+          stmt.close();
+        } catch (SQLException ex) {
+          LOG.error("Unable to close statement", ex);
+        }
+      }
+      try {
+        conn.close();
+      } catch (SQLException ex) {
+        LOG.error("Unable to close connection", ex);
+      }
+    }
+  }
+
+
+  private class ReadThread extends Thread {
+    private InputStream in;
+
+    ReadThread(InputStream in) {
+      this.in = in;
+    }
+
+    public void run() {
+      BufferedReader reader = new BufferedReader(new InputStreamReader(in));
+      String line = null;
+      try {
+        while((line = reader.readLine()) != null) {
+          System.out.println(line);
+        }
+        reader.close();
+      } catch (Exception e) {
+        e.printStackTrace();
+      }
+    }
+  }
+}

Added: sqoop/trunk/src/java/org/apache/sqoop/mapreduce/PGBulkloadExportReducer.java
URL: http://svn.apache.org/viewvc/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/PGBulkloadExportReducer.java?rev=1374923&view=auto
==============================================================================
--- sqoop/trunk/src/java/org/apache/sqoop/mapreduce/PGBulkloadExportReducer.java (added)
+++ sqoop/trunk/src/java/org/apache/sqoop/mapreduce/PGBulkloadExportReducer.java Mon Aug 20 06:21:34 2012
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.sqoop.mapreduce.db.DBConfiguration;
+
+
+/**
+ * Reducer for transfering data from temporary table to destination.
+ * Reducer drops all temporary tables if all data successfully transfered.
+ * Temporary tables is not dropptd in error case for manual retry.
+ */
+public class PGBulkloadExportReducer
+    extends AutoProgressReducer<LongWritable, Text,
+                                NullWritable, NullWritable> {
+
+  public static final Log LOG =
+      LogFactory.getLog(PGBulkloadExportReducer.class.getName());
+  private Configuration conf;
+  private DBConfiguration dbConf;
+  private Connection conn;
+  private String tableName;
+
+
+  protected void setup(Context context)
+      throws IOException, InterruptedException {
+    conf = context.getConfiguration();
+    dbConf = new DBConfiguration(conf);
+    tableName = dbConf.getOutputTableName();
+    try {
+      conn = dbConf.getConnection();
+      conn.setAutoCommit(false);
+    } catch (ClassNotFoundException ex) {
+      LOG.error("Unable to load JDBC driver class", ex);
+      throw new IOException(ex);
+    } catch (SQLException ex) {
+      LOG.error("Unable to connect to database", ex);
+      throw new IOException(ex);
+    }
+  }
+
+
+  @Override
+    public void reduce(LongWritable key, Iterable<Text> values, Context context)
+    throws IOException, InterruptedException {
+    Statement stmt = null;
+    try {
+      stmt = conn.createStatement();
+      for (Text value : values) {
+        int inserted = stmt.executeUpdate("INSERT INTO " + tableName
+                                          + " ( SELECT * FROM " + value + " )");
+        stmt.executeUpdate("DROP TABLE " + value);
+      }
+      conn.commit();
+    } catch (SQLException ex) {
+      LOG.error("Unable to execute create query.", ex);
+      throw new IOException(ex);
+    } finally {
+      if (stmt != null) {
+        try {
+          stmt.close();
+        } catch (SQLException ex) {
+          LOG.error("Unable to close statement", ex);
+        }
+      }
+    }
+  }
+
+
+  protected void cleanup(Context context)
+    throws IOException, InterruptedException {
+    try {
+      conn.close();
+    } catch (SQLException ex) {
+      LOG.error("Unable to load JDBC driver class", ex);
+      throw new IOException(ex);
+    }
+  }
+
+}

Added: sqoop/trunk/src/java/org/apache/sqoop/mapreduce/ProgressThread.java
URL: http://svn.apache.org/viewvc/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/ProgressThread.java?rev=1374923&view=auto
==============================================================================
--- sqoop/trunk/src/java/org/apache/sqoop/mapreduce/ProgressThread.java (added)
+++ sqoop/trunk/src/java/org/apache/sqoop/mapreduce/ProgressThread.java Mon Aug 20 06:21:34 2012
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import org.apache.commons.logging.Log;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+
+
+/**
+  * Run the task process for auto-progress tasks.
+  */
+public class ProgressThread extends Thread {
+
+  private static Log log = null;
+
+  /**
+   * Total number of millis for which progress will be reported by the
+   * auto-progress thread. If this is zero, then the auto-progress thread will
+   * never voluntarily exit.
+   */
+  private int maxProgressPeriod;
+
+  /**
+   * Number of milliseconds to sleep for between loop iterations. Must be less
+   * than report interval.
+   */
+  private int sleepInterval;
+
+  /**
+   * Number of milliseconds between calls to Reporter.progress().
+   * Should be a multiple of the sleepInterval.
+   */
+  private int reportInterval;
+
+  public static final String MAX_PROGRESS_PERIOD_KEY =
+      "sqoop.mapred.auto.progress.max";
+  public static final String SLEEP_INTERVAL_KEY =
+      "sqoop.mapred.auto.progress.sleep";
+  public static final String REPORT_INTERVAL_KEY =
+      "sqoop.mapred.auto.progress.report";
+
+  // Sleep for 10 seconds at a time.
+  public static final int DEFAULT_SLEEP_INTERVAL = 10000;
+
+  // Report progress every 30 seconds.
+  public static final int DEFAULT_REPORT_INTERVAL = 30000;
+
+  // Disable max progress, by default.
+  public static final int DEFAULT_MAX_PROGRESS = 0;
+
+  private volatile boolean keepGoing; // While this is true, thread runs.
+
+  private TaskInputOutputContext context;
+  private long startTimeMillis;
+  private long lastReportMillis;
+
+  public ProgressThread(final TaskInputOutputContext ctxt, Log log) {
+    this.context = ctxt;
+    this.log = log;
+    this.keepGoing = true;
+    configureAutoProgress(ctxt.getConfiguration());
+  }
+
+  /**
+   * Set configuration parameters for the auto-progress thread.
+   */
+  private void configureAutoProgress(Configuration job) {
+    this.maxProgressPeriod = job.getInt(MAX_PROGRESS_PERIOD_KEY,
+                                        DEFAULT_MAX_PROGRESS);
+    this.sleepInterval = job.getInt(SLEEP_INTERVAL_KEY,
+                                    DEFAULT_SLEEP_INTERVAL);
+    this.reportInterval = job.getInt(REPORT_INTERVAL_KEY,
+                                     DEFAULT_REPORT_INTERVAL);
+
+    if (this.reportInterval < 1) {
+      log.warn("Invalid " + REPORT_INTERVAL_KEY + "; setting to "
+               + DEFAULT_REPORT_INTERVAL);
+      this.reportInterval = DEFAULT_REPORT_INTERVAL;
+    }
+
+    if (this.sleepInterval > this.reportInterval || this.sleepInterval < 1) {
+      log.warn("Invalid " + SLEEP_INTERVAL_KEY + "; setting to "
+               + DEFAULT_SLEEP_INTERVAL);
+      this.sleepInterval = DEFAULT_SLEEP_INTERVAL;
+    }
+
+    if (this.maxProgressPeriod < 0) {
+      log.warn("Invalid " + MAX_PROGRESS_PERIOD_KEY + "; setting to "
+               + DEFAULT_MAX_PROGRESS);
+      this.maxProgressPeriod = DEFAULT_MAX_PROGRESS;
+    }
+  }
+
+  public void signalShutdown() {
+    this.keepGoing = false; // volatile update.
+    this.interrupt();
+  }
+
+  public void run() {
+    this.lastReportMillis = System.currentTimeMillis();
+    this.startTimeMillis = this.lastReportMillis;
+
+    final long MAX_PROGRESS = this.maxProgressPeriod;
+    final long REPORT_INTERVAL = this.reportInterval;
+    final long SLEEP_INTERVAL = this.sleepInterval;
+
+    // In a loop:
+    //   * Check that we haven't run for too long (maxProgressPeriod).
+    //   * If it's been a report interval since we last made progress,
+    //     make more.
+    //   * Sleep for a bit.
+    //   * If the parent thread has signaled for exit, do so.
+    while (this.keepGoing) {
+      long curTimeMillis = System.currentTimeMillis();
+
+      if (MAX_PROGRESS != 0
+          && curTimeMillis - this.startTimeMillis > MAX_PROGRESS) {
+        this.keepGoing = false;
+        log.info("Auto-progress thread exiting after " + MAX_PROGRESS
+                 + " ms.");
+        break;
+      }
+
+      if (curTimeMillis - this.lastReportMillis > REPORT_INTERVAL) {
+        // It's been a full report interval -- claim progress.
+        log.debug("Auto-progress thread reporting progress");
+        this.context.progress();
+        this.lastReportMillis = curTimeMillis;
+      }
+
+      // Unless we got an interrupt while we were working,
+      // sleep a bit before doing more work.
+      if (!Thread.interrupted()) {
+        try {
+          Thread.sleep(SLEEP_INTERVAL);
+        } catch (InterruptedException ie) {
+          // we were notified on something; not necessarily an error.
+        }
+      }
+    }
+    log.info("Auto-progress thread is finished. keepGoing=" + this.keepGoing);
+  }
+}

Added: sqoop/trunk/src/java/org/apache/sqoop/mapreduce/SqoopReducer.java
URL: http://svn.apache.org/viewvc/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/SqoopReducer.java?rev=1374923&view=auto
==============================================================================
--- sqoop/trunk/src/java/org/apache/sqoop/mapreduce/SqoopReducer.java (added)
+++ sqoop/trunk/src/java/org/apache/sqoop/mapreduce/SqoopReducer.java Mon Aug 20 06:21:34 2012
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.sqoop.util.LoggingUtils;
+
+import java.io.IOException;
+
+/**
+ * Base sqoop reducer class that is convenient place for common functionality.
+ * Other specific reducers are highly encouraged to inherit from this class.
+ */
+public abstract class SqoopReducer<KI, VI, KO, VO>
+  extends Reducer<KI, VI, KO, VO> {
+
+  @Override
+  protected void setup(Context context)
+    throws IOException, InterruptedException {
+    super.setup(context);
+
+    Configuration configuration = context.getConfiguration();
+
+    // Propagate verbose flag if needed
+    if (configuration.getBoolean(JobBase.PROPERTY_VERBOSE, false)) {
+      LoggingUtils.setDebugLevel();
+    }
+  }
+}

Added: sqoop/trunk/src/java/org/apache/sqoop/util/PostgreSQLUtils.java
URL: http://svn.apache.org/viewvc/sqoop/trunk/src/java/org/apache/sqoop/util/PostgreSQLUtils.java?rev=1374923&view=auto
==============================================================================
--- sqoop/trunk/src/java/org/apache/sqoop/util/PostgreSQLUtils.java (added)
+++ sqoop/trunk/src/java/org/apache/sqoop/util/PostgreSQLUtils.java Mon Aug 20 06:21:34 2012
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.util;
+
+import java.io.IOException;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.OutputStreamWriter;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Utility methods for PostgreSQL import/export.
+ */
+public final class PostgreSQLUtils {
+
+  public static final Log LOG =
+    LogFactory.getLog(PostgreSQLUtils.class.getName());
+
+  private PostgreSQLUtils() {
+  }
+
+  /** Write the user's password to a file that is chmod 0600.
+      @return the filename.
+  */
+  public static String writePasswordFile(String tmpDir, String password)
+    throws IOException {
+    File tempFile = File.createTempFile("pgpass", ".pgpass", new File(tmpDir));
+    LOG.debug("Writing password to tempfile: " + tempFile);
+
+    // Make sure it's only readable by the current user.
+    DirectImportUtils.setFilePermissions(tempFile, "0600");
+
+    // Actually write the password data into the file.
+    BufferedWriter w = new BufferedWriter(
+        new OutputStreamWriter(new FileOutputStream(tempFile)));
+    w.write("*:*:*:*:" + password);
+    w.close();
+    return tempFile.toString();
+  }
+}

Added: sqoop/trunk/src/test/com/cloudera/sqoop/manager/PGBulkloadManagerManualTest.java
URL: http://svn.apache.org/viewvc/sqoop/trunk/src/test/com/cloudera/sqoop/manager/PGBulkloadManagerManualTest.java?rev=1374923&view=auto
==============================================================================
--- sqoop/trunk/src/test/com/cloudera/sqoop/manager/PGBulkloadManagerManualTest.java (added)
+++ sqoop/trunk/src/test/com/cloudera/sqoop/manager/PGBulkloadManagerManualTest.java Mon Aug 20 06:21:34 2012
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.cloudera.sqoop.manager;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.PreparedStatement;
+import java.util.Arrays;
+import java.util.ArrayList;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import com.cloudera.sqoop.TestExport;
+import com.cloudera.sqoop.mapreduce.db.DBConfiguration;
+
+
+/**
+ * Test the PGBulkloadManager implementations.
+ * PGBulkloadManager uses both JDBC driver and pg_bulkload to facilitate it.
+ *
+ * Since this requires a Postgresql installation on your local machine to use,
+ * this class is named in such a way that Hadoop's default QA process does not
+ * run it.
+ *
+ * You need to run this manually with -Dtestcase=PGBulkloadManagerManualTest.
+ *
+ * You need to put Postgresql's JDBC driver library into lib dir.
+ *
+ * You need to create a sqooptest superuser and database and tablespace,
+ * and install pg_bulkload for sqooptest database:
+ *
+ * $ sudo -u postgres createuser -U postgres -s sqooptest
+ * $ sudo -u postgres createdb -U sqooptest sqooptest
+ * $ sudo -u postgres mkdir /var/pgdata/stagingtablespace
+ * $ psql -U sqooptest
+ *        -f /usr/local/share/postgresql/contrib/pg_bulkload.sql sqooptest
+ * $ psql -U sqooptest sqooptest
+ * sqooptest=# CREATE USER sqooptest;
+ * sqooptest=# CREATE DATABASE sqooptest;
+ * sqooptest=# CREATE TABLESPACE sqooptest
+ *                 LOCATION '/var/pgdata/stagingtablespace';
+ * sqooptest=# \q
+ *
+ */
+public class PGBulkloadManagerManualTest extends TestExport {
+
+  public static final Log LOG =
+      LogFactory.getLog(PGBulkloadManagerManualTest.class.getName());
+  private DBConfiguration dbConf;
+
+
+  public PGBulkloadManagerManualTest() {
+    Configuration conf = getConf();
+    DBConfiguration.configureDB(conf,
+                                "org.postgresql.Driver",
+                                getConnectString(),
+                                getUserName(),
+                                null, null);
+    dbConf = new DBConfiguration(conf);
+  }
+
+
+  @Override
+  protected boolean useHsqldbTestServer() {
+    return false;
+  }
+
+
+  @Override
+  protected String getConnectString() {
+    return "jdbc:postgresql://localhost:5432/sqooptest";
+  }
+
+
+  protected String getUserName() {
+    return "sqooptest";
+  }
+
+
+  @Override
+  protected String getTablePrefix() {
+    return super.getTablePrefix().toLowerCase();
+  }
+
+
+  @Override
+  protected String getTableName() {
+    return super.getTableName().toLowerCase();
+  }
+
+  @Override
+  public String getStagingTableName() {
+    return super.getStagingTableName().toLowerCase();
+  }
+
+
+  @Override
+  protected Connection getConnection() {
+    try {
+      Connection conn = dbConf.getConnection();
+      conn.setAutoCommit(false);
+      PreparedStatement stmt =
+          conn.prepareStatement("SET extra_float_digits TO 0");
+      stmt.executeUpdate();
+      conn.commit();
+      return conn;
+    } catch (SQLException sqlE) {
+      LOG.error("Could not get connection to test server: " + sqlE);
+      return null;
+    } catch (ClassNotFoundException cnfE) {
+      LOG.error("Could not find driver class: " + cnfE);
+      return null;
+    }
+  }
+
+
+  @Override
+  protected String getDropTableStatement(String tableName) {
+    return "DROP TABLE IF EXISTS " + tableName;
+  }
+
+
+  @Override
+  protected String[] getArgv(boolean includeHadoopFlags,
+                             int rowsPerStatement,
+                             int statementsPerTx,
+                             String... additionalArgv) {
+    ArrayList<String> args =
+        new ArrayList<String>(Arrays.asList(additionalArgv));
+    args.add("--username");
+    args.add(getUserName());
+    args.add("--connection-manager");
+    args.add("org.apache.sqoop.manager.PGBulkloadManager");
+    args.add("--staging-table");
+    args.add("dummy");
+    args.add("--clear-staging-table");
+    return super.getArgv(includeHadoopFlags,
+                         rowsPerStatement,
+                         statementsPerTx,
+                         args.toArray(new String[0]));
+  }
+
+
+  @Override
+  protected String [] getCodeGenArgv(String... extraArgs) {
+    ArrayList<String> args = new ArrayList<String>(Arrays.asList(extraArgs));
+    args.add("--username");
+    args.add(getUserName());
+    return super.getCodeGenArgv(args.toArray(new String[0]));
+  }
+
+
+  @Override
+  public void testColumnsExport() throws IOException, SQLException {
+    // PGBulkloadManager does not support --columns option.
+  }
+
+
+  public void testMultiReduceExport() throws IOException, SQLException {
+    String[] genericargs = newStrArray(null, "-Dmapred.reduce.tasks=2");
+    multiFileTestWithGenericArgs(2, 10, 2, genericargs);
+  }
+
+
+  public void testExportWithTablespace() throws IOException, SQLException {
+    String[] genericargs =
+      newStrArray(null, "-Dpgbulkload.staging.tablespace=sqooptest");
+    multiFileTestWithGenericArgs(1, 10, 1, genericargs);
+  }
+
+
+  protected void multiFileTestWithGenericArgs(int numFiles,
+                                              int recordsPerMap,
+                                              int numMaps,
+                                              String[] genericargs,
+                                              String... argv)
+    throws IOException, SQLException {
+
+    final int TOTAL_RECORDS = numFiles * recordsPerMap;
+
+    try {
+      LOG.info("Beginning test: numFiles=" + numFiles + "; recordsPerMap="
+               + recordsPerMap + "; numMaps=" + numMaps);
+      LOG.info("  with genericargs: ");
+      for (String arg : genericargs) {
+        LOG.info("    " + arg);
+      }
+
+      for (int i = 0; i < numFiles; i++) {
+        createTextFile(i, recordsPerMap, false);
+      }
+
+      createTable();
+
+      runExport(getArgv(true, 10, 10,
+                        newStrArray(newStrArray(genericargs, argv),
+                                    "-m", "" + numMaps)));
+      verifyExport(TOTAL_RECORDS);
+    } finally {
+      LOG.info("multi-reduce test complete");
+    }
+  }
+}