You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ja...@apache.org on 2013/07/10 04:46:18 UTC

git commit: SQOOP-1118: Move PostgreSQL specific MR codes to org.apache.sqoop.mapreduce.posgresql

Updated Branches:
  refs/heads/trunk fb29b8f9f -> b8fd60202


SQOOP-1118: Move PostgreSQL specific MR codes to org.apache.sqoop.mapreduce.posgresql

(Masatake Iwasaki via Jarek Jarcec Cecho)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/b8fd6020
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/b8fd6020
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/b8fd6020

Branch: refs/heads/trunk
Commit: b8fd60202865fa5c58c61361c45496e01bf05456
Parents: fb29b8f
Author: Jarek Jarcec Cecho <ja...@apache.org>
Authored: Tue Jul 9 19:45:32 2013 -0700
Committer: Jarek Jarcec Cecho <ja...@apache.org>
Committed: Tue Jul 9 19:45:32 2013 -0700

----------------------------------------------------------------------
 .../apache/sqoop/manager/PGBulkloadManager.java |   2 +-
 .../sqoop/mapreduce/PGBulkloadExportJob.java    | 209 -------------
 .../sqoop/mapreduce/PGBulkloadExportMapper.java | 310 ------------------
 .../mapreduce/PGBulkloadExportReducer.java      | 107 -------
 .../postgresql/PGBulkloadExportJob.java         | 210 +++++++++++++
 .../postgresql/PGBulkloadExportMapper.java      | 311 +++++++++++++++++++
 .../postgresql/PGBulkloadExportReducer.java     | 108 +++++++
 7 files changed, 630 insertions(+), 627 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/b8fd6020/src/java/org/apache/sqoop/manager/PGBulkloadManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/manager/PGBulkloadManager.java b/src/java/org/apache/sqoop/manager/PGBulkloadManager.java
index 091fd15..04e1443 100644
--- a/src/java/org/apache/sqoop/manager/PGBulkloadManager.java
+++ b/src/java/org/apache/sqoop/manager/PGBulkloadManager.java
@@ -26,7 +26,7 @@ import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.sqoop.mapreduce.ExportInputFormat;
-import org.apache.sqoop.mapreduce.PGBulkloadExportJob;
+import org.apache.sqoop.mapreduce.postgresql.PGBulkloadExportJob;
 
 
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/b8fd6020/src/java/org/apache/sqoop/mapreduce/PGBulkloadExportJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/PGBulkloadExportJob.java b/src/java/org/apache/sqoop/mapreduce/PGBulkloadExportJob.java
deleted file mode 100644
index cc60233..0000000
--- a/src/java/org/apache/sqoop/mapreduce/PGBulkloadExportJob.java
+++ /dev/null
@@ -1,209 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.sqoop.mapreduce;
-
-import java.io.IOException;
-import com.cloudera.sqoop.manager.ExportJobContext;
-import com.cloudera.sqoop.util.ExportException;
-import com.cloudera.sqoop.SqoopOptions;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.sqoop.config.ConfigurationHelper;
-import org.apache.sqoop.lib.DelimiterSet;
-import org.apache.sqoop.manager.ConnManager;
-import org.apache.sqoop.mapreduce.db.DBConfiguration;
-import org.apache.sqoop.orm.TableClassName;
-
-
-/**
- * Class that runs an export job using pg_bulkload in the mapper.
- */
-public class PGBulkloadExportJob extends ExportJobBase {
-
-  public static final Log LOG =
-      LogFactory.getLog(PGBulkloadExportJob.class.getName());
-
-
-  public PGBulkloadExportJob(final ExportJobContext context) {
-    super(context);
-  }
-
-
-  public PGBulkloadExportJob(final ExportJobContext ctxt,
-      final Class<? extends Mapper> mapperClass,
-      final Class<? extends InputFormat> inputFormatClass,
-      final Class<? extends OutputFormat> outputFormatClass) {
-    super(ctxt, mapperClass, inputFormatClass, outputFormatClass);
-  }
-
-
-  @Override
-  protected void configureInputFormat(Job job, String tableName,
-                                      String tableClassName, String splitByCol)
-    throws ClassNotFoundException, IOException {
-    super.configureInputFormat(job, tableName, tableClassName, splitByCol);
-    ConnManager mgr = context.getConnManager();
-    String username = options.getUsername();
-    if (null == username || username.length() == 0) {
-      DBConfiguration.configureDB(job.getConfiguration(),
-                                  mgr.getDriverClass(),
-                                  options.getConnectString(),
-                                  options.getFetchSize(),
-                                  options.getConnectionParams());
-    } else {
-      DBConfiguration.configureDB(job.getConfiguration(),
-                                  mgr.getDriverClass(),
-                                  options.getConnectString(),
-                                  username, options.getPassword(),
-                                  options.getFetchSize(),
-                                  options.getConnectionParams());
-    }
-  }
-
-
-  @Override
-  protected Class<? extends Mapper> getMapperClass() {
-    return PGBulkloadExportMapper.class;
-  }
-
-
-  protected Class<? extends Reducer> getReducerClass() {
-    return PGBulkloadExportReducer.class;
-  }
-
-
-  private void setDelimiter(String prop, char val, Configuration conf) {
-    switch (val) {
-    case DelimiterSet.NULL_CHAR:
-      break;
-    case '\t':
-    default:
-      conf.set(prop, String.valueOf(val));
-    }
-  }
-
-
-  @Override
-  protected void propagateOptionsToJob(Job job) {
-    super.propagateOptionsToJob(job);
-    SqoopOptions opts = context.getOptions();
-    Configuration conf = job.getConfiguration();
-    conf.setIfUnset("pgbulkload.bin", "pg_bulkload");
-    if (opts.getNullStringValue() != null) {
-      conf.set("pgbulkload.null.string", opts.getNullStringValue());
-    }
-    setDelimiter("pgbulkload.input.field.delim",
-                 opts.getInputFieldDelim(),
-                 conf);
-    setDelimiter("pgbulkload.input.record.delim",
-                 opts.getInputRecordDelim(),
-                 conf);
-    setDelimiter("pgbulkload.input.enclosedby",
-                 opts.getInputEnclosedBy(),
-                 conf);
-    setDelimiter("pgbulkload.input.escapedby",
-                 opts.getInputEscapedBy(),
-                 conf);
-    conf.setBoolean("pgbulkload.input.encloserequired",
-                    opts.isInputEncloseRequired());
-    conf.setIfUnset("pgbulkload.check.constraints", "YES");
-    conf.setIfUnset("pgbulkload.parse.errors", "INFINITE");
-    conf.setIfUnset("pgbulkload.duplicate.errors", "INFINITE");
-    conf.set("mapred.jar", context.getJarFile());
-    conf.setBoolean("mapred.map.tasks.speculative.execution", false);
-    conf.setBoolean("mapred.reduce.tasks.speculative.execution", false);
-    conf.setInt("mapred.map.max.attempts", 1);
-    conf.setInt("mapred.reduce.max.attempts", 1);
-    conf.setIfUnset("mapred.reduce.tasks",  "1");
-    if (context.getOptions().doClearStagingTable()) {
-      conf.setBoolean("pgbulkload.clear.staging.table", true);
-    }
-  }
-
-
-  @Override
-  public void runExport() throws ExportException, IOException {
-    ConnManager cmgr = context.getConnManager();
-    SqoopOptions options = context.getOptions();
-    Configuration conf = options.getConf();
-    DBConfiguration dbConf = null;
-    String outputTableName = context.getTableName();
-    String tableName = outputTableName;
-    String tableClassName =
-        new TableClassName(options).getClassForTable(outputTableName);
-
-    LOG.info("Beginning export of " + outputTableName);
-    loadJars(conf, context.getJarFile(), tableClassName);
-
-    try {
-      Job job = new Job(conf);
-      dbConf = new DBConfiguration(job.getConfiguration());
-      dbConf.setOutputTableName(tableName);
-      configureInputFormat(job, tableName, tableClassName, null);
-      configureOutputFormat(job, tableName, tableClassName);
-      configureNumTasks(job);
-      propagateOptionsToJob(job);
-      job.setMapperClass(getMapperClass());
-      job.setMapOutputKeyClass(LongWritable.class);
-      job.setMapOutputValueClass(Text.class);
-      job.setReducerClass(getReducerClass());
-      cacheJars(job, context.getConnManager());
-      setJob(job);
-
-      boolean success = runJob(job);
-      if (!success) {
-        throw new ExportException("Export job failed!");
-      }
-    } catch (InterruptedException ie) {
-      throw new IOException(ie);
-    } catch (ClassNotFoundException cnfe) {
-      throw new IOException(cnfe);
-    } finally {
-      unloadJars();
-    }
-  }
-
-
-  @Override
-  protected int configureNumTasks(Job job) throws IOException {
-    SqoopOptions options = context.getOptions();
-    int numMapTasks = options.getNumMappers();
-    if (numMapTasks < 1) {
-      numMapTasks = SqoopOptions.DEFAULT_NUM_MAPPERS;
-      LOG.warn("Invalid mapper count; using " + numMapTasks + " mappers.");
-    }
-
-    ConfigurationHelper.setJobNumMaps(job, numMapTasks);
-    return numMapTasks;
-  }
-
-
-  private void clearStagingTable(DBConfiguration dbConf, String tableName)
-    throws IOException {
-    // clearing stagingtable is done each mapper tasks
-  }
-}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/b8fd6020/src/java/org/apache/sqoop/mapreduce/PGBulkloadExportMapper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/PGBulkloadExportMapper.java b/src/java/org/apache/sqoop/mapreduce/PGBulkloadExportMapper.java
deleted file mode 100644
index 81b1333..0000000
--- a/src/java/org/apache/sqoop/mapreduce/PGBulkloadExportMapper.java
+++ /dev/null
@@ -1,310 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.sqoop.mapreduce;
-
-import java.io.BufferedReader;
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.OutputStream;
-import java.io.OutputStreamWriter;
-import java.sql.Connection;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.sqoop.lib.SqoopRecord;
-import org.apache.hadoop.mapreduce.Mapper.Context;
-import org.apache.sqoop.mapreduce.db.DBConfiguration;
-import org.apache.sqoop.util.LoggingUtils;
-import org.apache.sqoop.util.PostgreSQLUtils;
-import org.apache.sqoop.util.Executor;
-import org.apache.sqoop.util.JdbcUrl;
-
-
-/**
- * Mapper that starts a 'pg_bulkload' process and uses that to export rows from
- * HDFS to a PostgreSQL database at high speed.
- *
- * map() methods are actually provided by subclasses that read from
- * SequenceFiles (containing existing SqoopRecords) or text files
- * (containing delimited lines) and deliver these results to the stream
- * used to interface with pg_bulkload.
- */
-public class PGBulkloadExportMapper
-    extends AutoProgressMapper<LongWritable, Writable, LongWritable, Text> {
-  private Configuration conf;
-  private DBConfiguration dbConf;
-  private Process process;
-  private OutputStream out;
-  protected BufferedWriter writer;
-  private Thread thread;
-  protected String tmpTableName;
-  private String tableName;
-  private String passwordFilename;
-
-
-  public PGBulkloadExportMapper() {
-  }
-
-
-  protected void setup(Context context)
-      throws IOException, InterruptedException {
-    super.setup(context);
-    conf = context.getConfiguration();
-    dbConf = new DBConfiguration(conf);
-    tableName = dbConf.getOutputTableName();
-    tmpTableName = tableName + "_" + context.getTaskAttemptID().toString();
-
-    Connection conn = null;
-    try {
-      conn = dbConf.getConnection();
-      conn.setAutoCommit(false);
-      if (conf.getBoolean("pgbulkload.clear.staging.table", false)) {
-        StringBuffer query = new StringBuffer();
-        query.append("DROP TABLE IF EXISTS ");
-        query.append(tmpTableName);
-        doExecuteUpdate(query.toString());
-      }
-      StringBuffer query = new StringBuffer();
-      query.append("CREATE TABLE ");
-      query.append(tmpTableName);
-      query.append("(LIKE ");
-      query.append(tableName);
-      query.append(" INCLUDING CONSTRAINTS)");
-      if (conf.get("pgbulkload.staging.tablespace") != null) {
-        query.append("TABLESPACE ");
-        query.append(conf.get("pgbulkload.staging.tablespace"));
-      }
-      doExecuteUpdate(query.toString());
-      conn.commit();
-    } catch (ClassNotFoundException ex) {
-      LOG.error("Unable to load JDBC driver class", ex);
-      throw new IOException(ex);
-    } catch (SQLException ex) {
-      LoggingUtils.logAll(LOG, "Unable to execute statement", ex);
-      throw new IOException(ex);
-    } finally {
-      try {
-        conn.close();
-      } catch (SQLException ex) {
-        LoggingUtils.logAll(LOG, "Unable to close connection", ex);
-      }
-    }
-
-    try {
-      ArrayList<String> args = new ArrayList<String>();
-      List<String> envp = Executor.getCurEnvpStrings();
-      args.add(conf.get("pgbulkload.bin", "pg_bulkload"));
-      args.add("--username="
-          + conf.get(DBConfiguration.USERNAME_PROPERTY));
-      args.add("--dbname="
-          + JdbcUrl.getDatabaseName(conf.get(DBConfiguration.URL_PROPERTY)));
-      args.add("--host="
-          + JdbcUrl.getHostName(conf.get(DBConfiguration.URL_PROPERTY)));
-      args.add("--port="
-          + JdbcUrl.getPort(conf.get(DBConfiguration.URL_PROPERTY)));
-      args.add("--input=stdin");
-      args.add("--output=" + tmpTableName);
-      args.add("-o");
-      args.add("TYPE=CSV");
-      args.add("-o");
-      args.add("DELIMITER=" + conf.get("pgbulkload.input.field.delim", ","));
-      args.add("-o");
-      args.add("QUOTE=" + conf.get("pgbulkload.input.enclosedby", "\""));
-      args.add("-o");
-      args.add("ESCAPE=" + conf.get("pgbulkload.input.escapedby", "\""));
-      args.add("-o");
-      args.add("CHECK_CONSTRAINTS=" + conf.get("pgbulkload.check.constraints"));
-      args.add("-o");
-      args.add("PARSE_ERRORS=" + conf.get("pgbulkload.parse.errors"));
-      args.add("-o");
-      args.add("DUPLICATE_ERRORS=" + conf.get("pgbulkload.duplicate.errors"));
-      if (conf.get("pgbulkload.null.string") != null) {
-        args.add("-o");
-        args.add("NULL=" + conf.get("pgbulkload.null.string"));
-      }
-      if (conf.get("pgbulkload.filter") != null) {
-        args.add("-o");
-        args.add("FILTER=" + conf.get("pgbulkload.filter"));
-      }
-      LOG.debug("Starting pg_bulkload with arguments:");
-      for (String arg : args) {
-        LOG.debug("  " + arg);
-      }
-      if (conf.get(DBConfiguration.PASSWORD_PROPERTY) != null) {
-        String tmpDir = System.getProperty("test.build.data", "/tmp/");
-        if (!tmpDir.endsWith(File.separator)) {
-          tmpDir = tmpDir + File.separator;
-        }
-        tmpDir = conf.get("job.local.dir", tmpDir);
-        passwordFilename = PostgreSQLUtils.writePasswordFile(tmpDir,
-            conf.get(DBConfiguration.PASSWORD_PROPERTY));
-        envp.add("PGPASSFILE=" + passwordFilename);
-      }
-      process = Runtime.getRuntime().exec(args.toArray(new String[0]),
-                                          envp.toArray(new String[0]));
-      out = process.getOutputStream();
-      writer = new BufferedWriter(new OutputStreamWriter(out));
-      thread = new ReadThread(process.getErrorStream());
-      thread.start();
-    } catch (Exception e) {
-      LOG.error("Can't start up pg_bulkload process", e);
-      cleanup(context);
-      doExecuteUpdate("DROP TABLE " + tmpTableName);
-      throw new IOException(e);
-    }
-  }
-
-
-  public void map(LongWritable key, Writable value, Context context)
-    throws IOException, InterruptedException {
-    try {
-      String str = value.toString();
-      if (value instanceof Text) {
-        writer.write(str, 0, str.length());
-        writer.newLine();
-      } else if (value instanceof SqoopRecord) {
-        writer.write(str, 0, str.length());
-      }
-    } catch (Exception e) {
-      doExecuteUpdate("DROP TABLE " + tmpTableName);
-      cleanup(context);
-      throw new IOException(e);
-    }
-  }
-
-
-  protected void cleanup(Context context)
-    throws IOException, InterruptedException {
-    LongWritable taskid =
-      new LongWritable(context.getTaskAttemptID().getTaskID().getId());
-    context.write(taskid, new Text(tmpTableName));
-
-    if (writer != null) {
-      writer.close();
-    }
-    if (out != null) {
-      out.close();
-    }
-    try {
-      if (thread != null) {
-        thread.join();
-      }
-    } finally {
-      // block until the process is done.
-      if (null != process) {
-        while (true) {
-          try {
-            int returnValue = process.waitFor();
-
-            // Check pg_bulkload's process return value
-            if (returnValue != 0) {
-              throw new RuntimeException(
-                "Unexpected return value from pg_bulkload: "+ returnValue);
-            }
-          } catch (InterruptedException ie) {
-            // interrupted; loop around.
-            LOG.debug("Caught interrupted exception waiting for process "
-                + "pg_bulkload.bin to exit");
-            //Clear the interrupted flag.   We have to call Thread.interrupted
-            //to clear for interrupted exceptions from process.waitFor
-            //See http://bugs.sun.com/view_bug.do?bug_id=6420270 for more info
-            Thread.interrupted();
-            continue;
-          }
-          break;
-        }
-      }
-    }
-    if (null != passwordFilename) {
-      if (!new File(passwordFilename).delete()) {
-        LOG.error("Could not remove postgresql password file "
-                  + passwordFilename);
-        LOG.error("You should remove this file to protect your credentials.");
-      }
-    }
-  }
-
-
-  protected int doExecuteUpdate(String query) throws IOException {
-    Connection conn = null;
-    try {
-      conn = dbConf.getConnection();
-      conn.setAutoCommit(false);
-    } catch (ClassNotFoundException ex) {
-      LOG.error("Unable to load JDBC driver class", ex);
-      throw new IOException(ex);
-    } catch (SQLException ex) {
-      LoggingUtils.logAll(LOG, "Unable to connect to database", ex);
-      throw new IOException(ex);
-    }
-    Statement stmt = null;
-    try {
-      stmt = conn.createStatement();
-      int ret = stmt.executeUpdate(query);
-      conn.commit();
-      return ret;
-    } catch (SQLException ex) {
-      LoggingUtils.logAll(LOG, "Unable to execute query: "  + query, ex);
-      throw new IOException(ex);
-    } finally {
-      if (stmt != null) {
-        try {
-          stmt.close();
-        } catch (SQLException ex) {
-          LoggingUtils.logAll(LOG, "Unable to close statement", ex);
-        }
-      }
-      try {
-        conn.close();
-      } catch (SQLException ex) {
-        LoggingUtils.logAll(LOG, "Unable to close connection", ex);
-      }
-    }
-  }
-
-
-  private class ReadThread extends Thread {
-    private InputStream in;
-
-    ReadThread(InputStream in) {
-      this.in = in;
-    }
-
-    public void run() {
-      BufferedReader reader = new BufferedReader(new InputStreamReader(in));
-      String line = null;
-      try {
-        while((line = reader.readLine()) != null) {
-          System.out.println(line);
-        }
-        reader.close();
-      } catch (Exception e) {
-        e.printStackTrace();
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/b8fd6020/src/java/org/apache/sqoop/mapreduce/PGBulkloadExportReducer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/PGBulkloadExportReducer.java b/src/java/org/apache/sqoop/mapreduce/PGBulkloadExportReducer.java
deleted file mode 100644
index 6f55861..0000000
--- a/src/java/org/apache/sqoop/mapreduce/PGBulkloadExportReducer.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.sqoop.mapreduce;
-
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.SQLException;
-import java.sql.Statement;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.sqoop.mapreduce.db.DBConfiguration;
-import org.apache.sqoop.util.LoggingUtils;
-
-
-/**
- * Reducer for transfering data from temporary table to destination.
- * Reducer drops all temporary tables if all data successfully transfered.
- * Temporary tables is not dropptd in error case for manual retry.
- */
-public class PGBulkloadExportReducer
-    extends AutoProgressReducer<LongWritable, Text,
-                                NullWritable, NullWritable> {
-
-  public static final Log LOG =
-      LogFactory.getLog(PGBulkloadExportReducer.class.getName());
-  private Configuration conf;
-  private DBConfiguration dbConf;
-  private Connection conn;
-  private String tableName;
-
-
-  protected void setup(Context context)
-      throws IOException, InterruptedException {
-    conf = context.getConfiguration();
-    dbConf = new DBConfiguration(conf);
-    tableName = dbConf.getOutputTableName();
-    try {
-      conn = dbConf.getConnection();
-      conn.setAutoCommit(false);
-    } catch (ClassNotFoundException ex) {
-      LOG.error("Unable to load JDBC driver class", ex);
-      throw new IOException(ex);
-    } catch (SQLException ex) {
-      LoggingUtils.logAll(LOG, "Unable to connect to database", ex);
-      throw new IOException(ex);
-    }
-  }
-
-
-  @Override
-    public void reduce(LongWritable key, Iterable<Text> values, Context context)
-    throws IOException, InterruptedException {
-    Statement stmt = null;
-    try {
-      stmt = conn.createStatement();
-      for (Text value : values) {
-        int inserted = stmt.executeUpdate("INSERT INTO " + tableName
-                                          + " ( SELECT * FROM " + value + " )");
-        stmt.executeUpdate("DROP TABLE " + value);
-      }
-      conn.commit();
-    } catch (SQLException ex) {
-      LoggingUtils.logAll(LOG, "Unable to execute create query.", ex);
-      throw new IOException(ex);
-    } finally {
-      if (stmt != null) {
-        try {
-          stmt.close();
-        } catch (SQLException ex) {
-          LoggingUtils.logAll(LOG, "Unable to close statement", ex);
-        }
-      }
-    }
-  }
-
-
-  protected void cleanup(Context context)
-    throws IOException, InterruptedException {
-    try {
-      conn.close();
-    } catch (SQLException ex) {
-      LoggingUtils.logAll(LOG, "Unable to load JDBC driver class", ex);
-      throw new IOException(ex);
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/b8fd6020/src/java/org/apache/sqoop/mapreduce/postgresql/PGBulkloadExportJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/postgresql/PGBulkloadExportJob.java b/src/java/org/apache/sqoop/mapreduce/postgresql/PGBulkloadExportJob.java
new file mode 100644
index 0000000..79fb7da
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/postgresql/PGBulkloadExportJob.java
@@ -0,0 +1,210 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce.postgresql;
+
+import java.io.IOException;
+import com.cloudera.sqoop.manager.ExportJobContext;
+import com.cloudera.sqoop.util.ExportException;
+import com.cloudera.sqoop.SqoopOptions;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.sqoop.config.ConfigurationHelper;
+import org.apache.sqoop.lib.DelimiterSet;
+import org.apache.sqoop.manager.ConnManager;
+import org.apache.sqoop.mapreduce.ExportJobBase;
+import org.apache.sqoop.mapreduce.db.DBConfiguration;
+import org.apache.sqoop.orm.TableClassName;
+
+
+/**
+ * Class that runs an export job using pg_bulkload in the mapper.
+ */
+public class PGBulkloadExportJob extends ExportJobBase {
+
+  public static final Log LOG =
+      LogFactory.getLog(PGBulkloadExportJob.class.getName());
+
+
+  public PGBulkloadExportJob(final ExportJobContext context) {
+    super(context);
+  }
+
+
+  public PGBulkloadExportJob(final ExportJobContext ctxt,
+      final Class<? extends Mapper> mapperClass,
+      final Class<? extends InputFormat> inputFormatClass,
+      final Class<? extends OutputFormat> outputFormatClass) {
+    super(ctxt, mapperClass, inputFormatClass, outputFormatClass);
+  }
+
+
+  @Override
+  protected void configureInputFormat(Job job, String tableName,
+                                      String tableClassName, String splitByCol)
+    throws ClassNotFoundException, IOException {
+    super.configureInputFormat(job, tableName, tableClassName, splitByCol);
+    ConnManager mgr = context.getConnManager();
+    String username = options.getUsername();
+    if (null == username || username.length() == 0) {
+      DBConfiguration.configureDB(job.getConfiguration(),
+                                  mgr.getDriverClass(),
+                                  options.getConnectString(),
+                                  options.getFetchSize(),
+                                  options.getConnectionParams());
+    } else {
+      DBConfiguration.configureDB(job.getConfiguration(),
+                                  mgr.getDriverClass(),
+                                  options.getConnectString(),
+                                  username, options.getPassword(),
+                                  options.getFetchSize(),
+                                  options.getConnectionParams());
+    }
+  }
+
+
+  @Override
+  protected Class<? extends Mapper> getMapperClass() {
+    return PGBulkloadExportMapper.class;
+  }
+
+
+  protected Class<? extends Reducer> getReducerClass() {
+    return PGBulkloadExportReducer.class;
+  }
+
+
+  private void setDelimiter(String prop, char val, Configuration conf) {
+    switch (val) {
+    case DelimiterSet.NULL_CHAR:
+      break;
+    case '\t':
+    default:
+      conf.set(prop, String.valueOf(val));
+    }
+  }
+
+
+  @Override
+  protected void propagateOptionsToJob(Job job) {
+    super.propagateOptionsToJob(job);
+    SqoopOptions opts = context.getOptions();
+    Configuration conf = job.getConfiguration();
+    conf.setIfUnset("pgbulkload.bin", "pg_bulkload");
+    if (opts.getNullStringValue() != null) {
+      conf.set("pgbulkload.null.string", opts.getNullStringValue());
+    }
+    setDelimiter("pgbulkload.input.field.delim",
+                 opts.getInputFieldDelim(),
+                 conf);
+    setDelimiter("pgbulkload.input.record.delim",
+                 opts.getInputRecordDelim(),
+                 conf);
+    setDelimiter("pgbulkload.input.enclosedby",
+                 opts.getInputEnclosedBy(),
+                 conf);
+    setDelimiter("pgbulkload.input.escapedby",
+                 opts.getInputEscapedBy(),
+                 conf);
+    conf.setBoolean("pgbulkload.input.encloserequired",
+                    opts.isInputEncloseRequired());
+    conf.setIfUnset("pgbulkload.check.constraints", "YES");
+    conf.setIfUnset("pgbulkload.parse.errors", "INFINITE");
+    conf.setIfUnset("pgbulkload.duplicate.errors", "INFINITE");
+    conf.set("mapred.jar", context.getJarFile());
+    conf.setBoolean("mapred.map.tasks.speculative.execution", false);
+    conf.setBoolean("mapred.reduce.tasks.speculative.execution", false);
+    conf.setInt("mapred.map.max.attempts", 1);
+    conf.setInt("mapred.reduce.max.attempts", 1);
+    conf.setIfUnset("mapred.reduce.tasks",  "1");
+    if (context.getOptions().doClearStagingTable()) {
+      conf.setBoolean("pgbulkload.clear.staging.table", true);
+    }
+  }
+
+
+  @Override
+  public void runExport() throws ExportException, IOException {
+    ConnManager cmgr = context.getConnManager();
+    SqoopOptions options = context.getOptions();
+    Configuration conf = options.getConf();
+    DBConfiguration dbConf = null;
+    String outputTableName = context.getTableName();
+    String tableName = outputTableName;
+    String tableClassName =
+        new TableClassName(options).getClassForTable(outputTableName);
+
+    LOG.info("Beginning export of " + outputTableName);
+    loadJars(conf, context.getJarFile(), tableClassName);
+
+    try {
+      Job job = new Job(conf);
+      dbConf = new DBConfiguration(job.getConfiguration());
+      dbConf.setOutputTableName(tableName);
+      configureInputFormat(job, tableName, tableClassName, null);
+      configureOutputFormat(job, tableName, tableClassName);
+      configureNumTasks(job);
+      propagateOptionsToJob(job);
+      job.setMapperClass(getMapperClass());
+      job.setMapOutputKeyClass(LongWritable.class);
+      job.setMapOutputValueClass(Text.class);
+      job.setReducerClass(getReducerClass());
+      cacheJars(job, context.getConnManager());
+      setJob(job);
+
+      boolean success = runJob(job);
+      if (!success) {
+        throw new ExportException("Export job failed!");
+      }
+    } catch (InterruptedException ie) {
+      throw new IOException(ie);
+    } catch (ClassNotFoundException cnfe) {
+      throw new IOException(cnfe);
+    } finally {
+      unloadJars();
+    }
+  }
+
+
+  @Override
+  protected int configureNumTasks(Job job) throws IOException {
+    SqoopOptions options = context.getOptions();
+    int numMapTasks = options.getNumMappers();
+    if (numMapTasks < 1) {
+      numMapTasks = SqoopOptions.DEFAULT_NUM_MAPPERS;
+      LOG.warn("Invalid mapper count; using " + numMapTasks + " mappers.");
+    }
+
+    ConfigurationHelper.setJobNumMaps(job, numMapTasks);
+    return numMapTasks;
+  }
+
+
+  private void clearStagingTable(DBConfiguration dbConf, String tableName)
+    throws IOException {
+    // clearing stagingtable is done each mapper tasks
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/b8fd6020/src/java/org/apache/sqoop/mapreduce/postgresql/PGBulkloadExportMapper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/postgresql/PGBulkloadExportMapper.java b/src/java/org/apache/sqoop/mapreduce/postgresql/PGBulkloadExportMapper.java
new file mode 100644
index 0000000..333546f
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/postgresql/PGBulkloadExportMapper.java
@@ -0,0 +1,311 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce.postgresql;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.sqoop.lib.SqoopRecord;
+import org.apache.hadoop.mapreduce.Mapper.Context;
+import org.apache.sqoop.mapreduce.AutoProgressMapper;
+import org.apache.sqoop.mapreduce.db.DBConfiguration;
+import org.apache.sqoop.util.LoggingUtils;
+import org.apache.sqoop.util.PostgreSQLUtils;
+import org.apache.sqoop.util.Executor;
+import org.apache.sqoop.util.JdbcUrl;
+
+
+/**
+ * Mapper that starts a 'pg_bulkload' process and uses that to export rows from
+ * HDFS to a PostgreSQL database at high speed.
+ *
+ * map() methods are actually provided by subclasses that read from
+ * SequenceFiles (containing existing SqoopRecords) or text files
+ * (containing delimited lines) and deliver these results to the stream
+ * used to interface with pg_bulkload.
+ */
+public class PGBulkloadExportMapper
+    extends AutoProgressMapper<LongWritable, Writable, LongWritable, Text> {
+  private Configuration conf;
+  private DBConfiguration dbConf;
+  private Process process;
+  private OutputStream out;
+  protected BufferedWriter writer;
+  private Thread thread;
+  protected String tmpTableName;
+  private String tableName;
+  private String passwordFilename;
+
+
+  public PGBulkloadExportMapper() {
+  }
+
+
+  protected void setup(Context context)
+      throws IOException, InterruptedException {
+    super.setup(context);
+    conf = context.getConfiguration();
+    dbConf = new DBConfiguration(conf);
+    tableName = dbConf.getOutputTableName();
+    tmpTableName = tableName + "_" + context.getTaskAttemptID().toString();
+
+    Connection conn = null;
+    try {
+      conn = dbConf.getConnection();
+      conn.setAutoCommit(false);
+      if (conf.getBoolean("pgbulkload.clear.staging.table", false)) {
+        StringBuffer query = new StringBuffer();
+        query.append("DROP TABLE IF EXISTS ");
+        query.append(tmpTableName);
+        doExecuteUpdate(query.toString());
+      }
+      StringBuffer query = new StringBuffer();
+      query.append("CREATE TABLE ");
+      query.append(tmpTableName);
+      query.append("(LIKE ");
+      query.append(tableName);
+      query.append(" INCLUDING CONSTRAINTS)");
+      if (conf.get("pgbulkload.staging.tablespace") != null) {
+        query.append("TABLESPACE ");
+        query.append(conf.get("pgbulkload.staging.tablespace"));
+      }
+      doExecuteUpdate(query.toString());
+      conn.commit();
+    } catch (ClassNotFoundException ex) {
+      LOG.error("Unable to load JDBC driver class", ex);
+      throw new IOException(ex);
+    } catch (SQLException ex) {
+      LoggingUtils.logAll(LOG, "Unable to execute statement", ex);
+      throw new IOException(ex);
+    } finally {
+      try {
+        conn.close();
+      } catch (SQLException ex) {
+        LoggingUtils.logAll(LOG, "Unable to close connection", ex);
+      }
+    }
+
+    try {
+      ArrayList<String> args = new ArrayList<String>();
+      List<String> envp = Executor.getCurEnvpStrings();
+      args.add(conf.get("pgbulkload.bin", "pg_bulkload"));
+      args.add("--username="
+          + conf.get(DBConfiguration.USERNAME_PROPERTY));
+      args.add("--dbname="
+          + JdbcUrl.getDatabaseName(conf.get(DBConfiguration.URL_PROPERTY)));
+      args.add("--host="
+          + JdbcUrl.getHostName(conf.get(DBConfiguration.URL_PROPERTY)));
+      args.add("--port="
+          + JdbcUrl.getPort(conf.get(DBConfiguration.URL_PROPERTY)));
+      args.add("--input=stdin");
+      args.add("--output=" + tmpTableName);
+      args.add("-o");
+      args.add("TYPE=CSV");
+      args.add("-o");
+      args.add("DELIMITER=" + conf.get("pgbulkload.input.field.delim", ","));
+      args.add("-o");
+      args.add("QUOTE=" + conf.get("pgbulkload.input.enclosedby", "\""));
+      args.add("-o");
+      args.add("ESCAPE=" + conf.get("pgbulkload.input.escapedby", "\""));
+      args.add("-o");
+      args.add("CHECK_CONSTRAINTS=" + conf.get("pgbulkload.check.constraints"));
+      args.add("-o");
+      args.add("PARSE_ERRORS=" + conf.get("pgbulkload.parse.errors"));
+      args.add("-o");
+      args.add("DUPLICATE_ERRORS=" + conf.get("pgbulkload.duplicate.errors"));
+      if (conf.get("pgbulkload.null.string") != null) {
+        args.add("-o");
+        args.add("NULL=" + conf.get("pgbulkload.null.string"));
+      }
+      if (conf.get("pgbulkload.filter") != null) {
+        args.add("-o");
+        args.add("FILTER=" + conf.get("pgbulkload.filter"));
+      }
+      LOG.debug("Starting pg_bulkload with arguments:");
+      for (String arg : args) {
+        LOG.debug("  " + arg);
+      }
+      if (conf.get(DBConfiguration.PASSWORD_PROPERTY) != null) {
+        String tmpDir = System.getProperty("test.build.data", "/tmp/");
+        if (!tmpDir.endsWith(File.separator)) {
+          tmpDir = tmpDir + File.separator;
+        }
+        tmpDir = conf.get("job.local.dir", tmpDir);
+        passwordFilename = PostgreSQLUtils.writePasswordFile(tmpDir,
+            conf.get(DBConfiguration.PASSWORD_PROPERTY));
+        envp.add("PGPASSFILE=" + passwordFilename);
+      }
+      process = Runtime.getRuntime().exec(args.toArray(new String[0]),
+                                          envp.toArray(new String[0]));
+      out = process.getOutputStream();
+      writer = new BufferedWriter(new OutputStreamWriter(out));
+      thread = new ReadThread(process.getErrorStream());
+      thread.start();
+    } catch (Exception e) {
+      LOG.error("Can't start up pg_bulkload process", e);
+      cleanup(context);
+      doExecuteUpdate("DROP TABLE " + tmpTableName);
+      throw new IOException(e);
+    }
+  }
+
+
+  public void map(LongWritable key, Writable value, Context context)
+    throws IOException, InterruptedException {
+    try {
+      String str = value.toString();
+      if (value instanceof Text) {
+        writer.write(str, 0, str.length());
+        writer.newLine();
+      } else if (value instanceof SqoopRecord) {
+        writer.write(str, 0, str.length());
+      }
+    } catch (Exception e) {
+      doExecuteUpdate("DROP TABLE " + tmpTableName);
+      cleanup(context);
+      throw new IOException(e);
+    }
+  }
+
+
+  protected void cleanup(Context context)
+    throws IOException, InterruptedException {
+    LongWritable taskid =
+      new LongWritable(context.getTaskAttemptID().getTaskID().getId());
+    context.write(taskid, new Text(tmpTableName));
+
+    if (writer != null) {
+      writer.close();
+    }
+    if (out != null) {
+      out.close();
+    }
+    try {
+      if (thread != null) {
+        thread.join();
+      }
+    } finally {
+      // block until the process is done.
+      if (null != process) {
+        while (true) {
+          try {
+            int returnValue = process.waitFor();
+
+            // Check pg_bulkload's process return value
+            if (returnValue != 0) {
+              throw new RuntimeException(
+                "Unexpected return value from pg_bulkload: "+ returnValue);
+            }
+          } catch (InterruptedException ie) {
+            // interrupted; loop around.
+            LOG.debug("Caught interrupted exception waiting for process "
+                + "pg_bulkload.bin to exit");
+            //Clear the interrupted flag.   We have to call Thread.interrupted
+            //to clear for interrupted exceptions from process.waitFor
+            //See http://bugs.sun.com/view_bug.do?bug_id=6420270 for more info
+            Thread.interrupted();
+            continue;
+          }
+          break;
+        }
+      }
+    }
+    if (null != passwordFilename) {
+      if (!new File(passwordFilename).delete()) {
+        LOG.error("Could not remove postgresql password file "
+                  + passwordFilename);
+        LOG.error("You should remove this file to protect your credentials.");
+      }
+    }
+  }
+
+
+  protected int doExecuteUpdate(String query) throws IOException {
+    Connection conn = null;
+    try {
+      conn = dbConf.getConnection();
+      conn.setAutoCommit(false);
+    } catch (ClassNotFoundException ex) {
+      LOG.error("Unable to load JDBC driver class", ex);
+      throw new IOException(ex);
+    } catch (SQLException ex) {
+      LoggingUtils.logAll(LOG, "Unable to connect to database", ex);
+      throw new IOException(ex);
+    }
+    Statement stmt = null;
+    try {
+      stmt = conn.createStatement();
+      int ret = stmt.executeUpdate(query);
+      conn.commit();
+      return ret;
+    } catch (SQLException ex) {
+      LoggingUtils.logAll(LOG, "Unable to execute query: "  + query, ex);
+      throw new IOException(ex);
+    } finally {
+      if (stmt != null) {
+        try {
+          stmt.close();
+        } catch (SQLException ex) {
+          LoggingUtils.logAll(LOG, "Unable to close statement", ex);
+        }
+      }
+      try {
+        conn.close();
+      } catch (SQLException ex) {
+        LoggingUtils.logAll(LOG, "Unable to close connection", ex);
+      }
+    }
+  }
+
+
+  private class ReadThread extends Thread {
+    private InputStream in;
+
+    ReadThread(InputStream in) {
+      this.in = in;
+    }
+
+    public void run() {
+      BufferedReader reader = new BufferedReader(new InputStreamReader(in));
+      String line = null;
+      try {
+        while((line = reader.readLine()) != null) {
+          System.out.println(line);
+        }
+        reader.close();
+      } catch (Exception e) {
+        e.printStackTrace();
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/b8fd6020/src/java/org/apache/sqoop/mapreduce/postgresql/PGBulkloadExportReducer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/postgresql/PGBulkloadExportReducer.java b/src/java/org/apache/sqoop/mapreduce/postgresql/PGBulkloadExportReducer.java
new file mode 100644
index 0000000..3dc05a7
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/postgresql/PGBulkloadExportReducer.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce.postgresql;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.sqoop.mapreduce.AutoProgressReducer;
+import org.apache.sqoop.mapreduce.db.DBConfiguration;
+import org.apache.sqoop.util.LoggingUtils;
+
+
+/**
+ * Reducer for transfering data from temporary table to destination.
+ * Reducer drops all temporary tables if all data successfully transfered.
+ * Temporary tables is not dropptd in error case for manual retry.
+ */
+public class PGBulkloadExportReducer
+    extends AutoProgressReducer<LongWritable, Text,
+                                NullWritable, NullWritable> {
+
+  public static final Log LOG =
+      LogFactory.getLog(PGBulkloadExportReducer.class.getName());
+  private Configuration conf;
+  private DBConfiguration dbConf;
+  private Connection conn;
+  private String tableName;
+
+
+  protected void setup(Context context)
+      throws IOException, InterruptedException {
+    conf = context.getConfiguration();
+    dbConf = new DBConfiguration(conf);
+    tableName = dbConf.getOutputTableName();
+    try {
+      conn = dbConf.getConnection();
+      conn.setAutoCommit(false);
+    } catch (ClassNotFoundException ex) {
+      LOG.error("Unable to load JDBC driver class", ex);
+      throw new IOException(ex);
+    } catch (SQLException ex) {
+      LoggingUtils.logAll(LOG, "Unable to connect to database", ex);
+      throw new IOException(ex);
+    }
+  }
+
+
+  @Override
+    public void reduce(LongWritable key, Iterable<Text> values, Context context)
+    throws IOException, InterruptedException {
+    Statement stmt = null;
+    try {
+      stmt = conn.createStatement();
+      for (Text value : values) {
+        int inserted = stmt.executeUpdate("INSERT INTO " + tableName
+                                          + " ( SELECT * FROM " + value + " )");
+        stmt.executeUpdate("DROP TABLE " + value);
+      }
+      conn.commit();
+    } catch (SQLException ex) {
+      LoggingUtils.logAll(LOG, "Unable to execute create query.", ex);
+      throw new IOException(ex);
+    } finally {
+      if (stmt != null) {
+        try {
+          stmt.close();
+        } catch (SQLException ex) {
+          LoggingUtils.logAll(LOG, "Unable to close statement", ex);
+        }
+      }
+    }
+  }
+
+
+  protected void cleanup(Context context)
+    throws IOException, InterruptedException {
+    try {
+      conn.close();
+    } catch (SQLException ex) {
+      LoggingUtils.logAll(LOG, "Unable to load JDBC driver class", ex);
+      throw new IOException(ex);
+    }
+  }
+
+}