You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ja...@apache.org on 2013/07/10 04:46:18 UTC
git commit: SQOOP-1118: Move PostgreSQL specific MR codes to
org.apache.sqoop.mapreduce.posgresql
Updated Branches:
refs/heads/trunk fb29b8f9f -> b8fd60202
SQOOP-1118: Move PostgreSQL specific MR codes to org.apache.sqoop.mapreduce.posgresql
(Masatake Iwasaki via Jarek Jarcec Cecho)
Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/b8fd6020
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/b8fd6020
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/b8fd6020
Branch: refs/heads/trunk
Commit: b8fd60202865fa5c58c61361c45496e01bf05456
Parents: fb29b8f
Author: Jarek Jarcec Cecho <ja...@apache.org>
Authored: Tue Jul 9 19:45:32 2013 -0700
Committer: Jarek Jarcec Cecho <ja...@apache.org>
Committed: Tue Jul 9 19:45:32 2013 -0700
----------------------------------------------------------------------
.../apache/sqoop/manager/PGBulkloadManager.java | 2 +-
.../sqoop/mapreduce/PGBulkloadExportJob.java | 209 -------------
.../sqoop/mapreduce/PGBulkloadExportMapper.java | 310 ------------------
.../mapreduce/PGBulkloadExportReducer.java | 107 -------
.../postgresql/PGBulkloadExportJob.java | 210 +++++++++++++
.../postgresql/PGBulkloadExportMapper.java | 311 +++++++++++++++++++
.../postgresql/PGBulkloadExportReducer.java | 108 +++++++
7 files changed, 630 insertions(+), 627 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/sqoop/blob/b8fd6020/src/java/org/apache/sqoop/manager/PGBulkloadManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/manager/PGBulkloadManager.java b/src/java/org/apache/sqoop/manager/PGBulkloadManager.java
index 091fd15..04e1443 100644
--- a/src/java/org/apache/sqoop/manager/PGBulkloadManager.java
+++ b/src/java/org/apache/sqoop/manager/PGBulkloadManager.java
@@ -26,7 +26,7 @@ import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.sqoop.mapreduce.ExportInputFormat;
-import org.apache.sqoop.mapreduce.PGBulkloadExportJob;
+import org.apache.sqoop.mapreduce.postgresql.PGBulkloadExportJob;
http://git-wip-us.apache.org/repos/asf/sqoop/blob/b8fd6020/src/java/org/apache/sqoop/mapreduce/PGBulkloadExportJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/PGBulkloadExportJob.java b/src/java/org/apache/sqoop/mapreduce/PGBulkloadExportJob.java
deleted file mode 100644
index cc60233..0000000
--- a/src/java/org/apache/sqoop/mapreduce/PGBulkloadExportJob.java
+++ /dev/null
@@ -1,209 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.sqoop.mapreduce;
-
-import java.io.IOException;
-import com.cloudera.sqoop.manager.ExportJobContext;
-import com.cloudera.sqoop.util.ExportException;
-import com.cloudera.sqoop.SqoopOptions;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.sqoop.config.ConfigurationHelper;
-import org.apache.sqoop.lib.DelimiterSet;
-import org.apache.sqoop.manager.ConnManager;
-import org.apache.sqoop.mapreduce.db.DBConfiguration;
-import org.apache.sqoop.orm.TableClassName;
-
-
-/**
- * Class that runs an export job using pg_bulkload in the mapper.
- */
-public class PGBulkloadExportJob extends ExportJobBase {
-
- public static final Log LOG =
- LogFactory.getLog(PGBulkloadExportJob.class.getName());
-
-
- public PGBulkloadExportJob(final ExportJobContext context) {
- super(context);
- }
-
-
- public PGBulkloadExportJob(final ExportJobContext ctxt,
- final Class<? extends Mapper> mapperClass,
- final Class<? extends InputFormat> inputFormatClass,
- final Class<? extends OutputFormat> outputFormatClass) {
- super(ctxt, mapperClass, inputFormatClass, outputFormatClass);
- }
-
-
- @Override
- protected void configureInputFormat(Job job, String tableName,
- String tableClassName, String splitByCol)
- throws ClassNotFoundException, IOException {
- super.configureInputFormat(job, tableName, tableClassName, splitByCol);
- ConnManager mgr = context.getConnManager();
- String username = options.getUsername();
- if (null == username || username.length() == 0) {
- DBConfiguration.configureDB(job.getConfiguration(),
- mgr.getDriverClass(),
- options.getConnectString(),
- options.getFetchSize(),
- options.getConnectionParams());
- } else {
- DBConfiguration.configureDB(job.getConfiguration(),
- mgr.getDriverClass(),
- options.getConnectString(),
- username, options.getPassword(),
- options.getFetchSize(),
- options.getConnectionParams());
- }
- }
-
-
- @Override
- protected Class<? extends Mapper> getMapperClass() {
- return PGBulkloadExportMapper.class;
- }
-
-
- protected Class<? extends Reducer> getReducerClass() {
- return PGBulkloadExportReducer.class;
- }
-
-
- private void setDelimiter(String prop, char val, Configuration conf) {
- switch (val) {
- case DelimiterSet.NULL_CHAR:
- break;
- case '\t':
- default:
- conf.set(prop, String.valueOf(val));
- }
- }
-
-
- @Override
- protected void propagateOptionsToJob(Job job) {
- super.propagateOptionsToJob(job);
- SqoopOptions opts = context.getOptions();
- Configuration conf = job.getConfiguration();
- conf.setIfUnset("pgbulkload.bin", "pg_bulkload");
- if (opts.getNullStringValue() != null) {
- conf.set("pgbulkload.null.string", opts.getNullStringValue());
- }
- setDelimiter("pgbulkload.input.field.delim",
- opts.getInputFieldDelim(),
- conf);
- setDelimiter("pgbulkload.input.record.delim",
- opts.getInputRecordDelim(),
- conf);
- setDelimiter("pgbulkload.input.enclosedby",
- opts.getInputEnclosedBy(),
- conf);
- setDelimiter("pgbulkload.input.escapedby",
- opts.getInputEscapedBy(),
- conf);
- conf.setBoolean("pgbulkload.input.encloserequired",
- opts.isInputEncloseRequired());
- conf.setIfUnset("pgbulkload.check.constraints", "YES");
- conf.setIfUnset("pgbulkload.parse.errors", "INFINITE");
- conf.setIfUnset("pgbulkload.duplicate.errors", "INFINITE");
- conf.set("mapred.jar", context.getJarFile());
- conf.setBoolean("mapred.map.tasks.speculative.execution", false);
- conf.setBoolean("mapred.reduce.tasks.speculative.execution", false);
- conf.setInt("mapred.map.max.attempts", 1);
- conf.setInt("mapred.reduce.max.attempts", 1);
- conf.setIfUnset("mapred.reduce.tasks", "1");
- if (context.getOptions().doClearStagingTable()) {
- conf.setBoolean("pgbulkload.clear.staging.table", true);
- }
- }
-
-
- @Override
- public void runExport() throws ExportException, IOException {
- ConnManager cmgr = context.getConnManager();
- SqoopOptions options = context.getOptions();
- Configuration conf = options.getConf();
- DBConfiguration dbConf = null;
- String outputTableName = context.getTableName();
- String tableName = outputTableName;
- String tableClassName =
- new TableClassName(options).getClassForTable(outputTableName);
-
- LOG.info("Beginning export of " + outputTableName);
- loadJars(conf, context.getJarFile(), tableClassName);
-
- try {
- Job job = new Job(conf);
- dbConf = new DBConfiguration(job.getConfiguration());
- dbConf.setOutputTableName(tableName);
- configureInputFormat(job, tableName, tableClassName, null);
- configureOutputFormat(job, tableName, tableClassName);
- configureNumTasks(job);
- propagateOptionsToJob(job);
- job.setMapperClass(getMapperClass());
- job.setMapOutputKeyClass(LongWritable.class);
- job.setMapOutputValueClass(Text.class);
- job.setReducerClass(getReducerClass());
- cacheJars(job, context.getConnManager());
- setJob(job);
-
- boolean success = runJob(job);
- if (!success) {
- throw new ExportException("Export job failed!");
- }
- } catch (InterruptedException ie) {
- throw new IOException(ie);
- } catch (ClassNotFoundException cnfe) {
- throw new IOException(cnfe);
- } finally {
- unloadJars();
- }
- }
-
-
- @Override
- protected int configureNumTasks(Job job) throws IOException {
- SqoopOptions options = context.getOptions();
- int numMapTasks = options.getNumMappers();
- if (numMapTasks < 1) {
- numMapTasks = SqoopOptions.DEFAULT_NUM_MAPPERS;
- LOG.warn("Invalid mapper count; using " + numMapTasks + " mappers.");
- }
-
- ConfigurationHelper.setJobNumMaps(job, numMapTasks);
- return numMapTasks;
- }
-
-
- private void clearStagingTable(DBConfiguration dbConf, String tableName)
- throws IOException {
- // clearing stagingtable is done each mapper tasks
- }
-}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/b8fd6020/src/java/org/apache/sqoop/mapreduce/PGBulkloadExportMapper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/PGBulkloadExportMapper.java b/src/java/org/apache/sqoop/mapreduce/PGBulkloadExportMapper.java
deleted file mode 100644
index 81b1333..0000000
--- a/src/java/org/apache/sqoop/mapreduce/PGBulkloadExportMapper.java
+++ /dev/null
@@ -1,310 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.sqoop.mapreduce;
-
-import java.io.BufferedReader;
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.OutputStream;
-import java.io.OutputStreamWriter;
-import java.sql.Connection;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.sqoop.lib.SqoopRecord;
-import org.apache.hadoop.mapreduce.Mapper.Context;
-import org.apache.sqoop.mapreduce.db.DBConfiguration;
-import org.apache.sqoop.util.LoggingUtils;
-import org.apache.sqoop.util.PostgreSQLUtils;
-import org.apache.sqoop.util.Executor;
-import org.apache.sqoop.util.JdbcUrl;
-
-
-/**
- * Mapper that starts a 'pg_bulkload' process and uses that to export rows from
- * HDFS to a PostgreSQL database at high speed.
- *
- * map() methods are actually provided by subclasses that read from
- * SequenceFiles (containing existing SqoopRecords) or text files
- * (containing delimited lines) and deliver these results to the stream
- * used to interface with pg_bulkload.
- */
-public class PGBulkloadExportMapper
- extends AutoProgressMapper<LongWritable, Writable, LongWritable, Text> {
- private Configuration conf;
- private DBConfiguration dbConf;
- private Process process;
- private OutputStream out;
- protected BufferedWriter writer;
- private Thread thread;
- protected String tmpTableName;
- private String tableName;
- private String passwordFilename;
-
-
- public PGBulkloadExportMapper() {
- }
-
-
- protected void setup(Context context)
- throws IOException, InterruptedException {
- super.setup(context);
- conf = context.getConfiguration();
- dbConf = new DBConfiguration(conf);
- tableName = dbConf.getOutputTableName();
- tmpTableName = tableName + "_" + context.getTaskAttemptID().toString();
-
- Connection conn = null;
- try {
- conn = dbConf.getConnection();
- conn.setAutoCommit(false);
- if (conf.getBoolean("pgbulkload.clear.staging.table", false)) {
- StringBuffer query = new StringBuffer();
- query.append("DROP TABLE IF EXISTS ");
- query.append(tmpTableName);
- doExecuteUpdate(query.toString());
- }
- StringBuffer query = new StringBuffer();
- query.append("CREATE TABLE ");
- query.append(tmpTableName);
- query.append("(LIKE ");
- query.append(tableName);
- query.append(" INCLUDING CONSTRAINTS)");
- if (conf.get("pgbulkload.staging.tablespace") != null) {
- query.append("TABLESPACE ");
- query.append(conf.get("pgbulkload.staging.tablespace"));
- }
- doExecuteUpdate(query.toString());
- conn.commit();
- } catch (ClassNotFoundException ex) {
- LOG.error("Unable to load JDBC driver class", ex);
- throw new IOException(ex);
- } catch (SQLException ex) {
- LoggingUtils.logAll(LOG, "Unable to execute statement", ex);
- throw new IOException(ex);
- } finally {
- try {
- conn.close();
- } catch (SQLException ex) {
- LoggingUtils.logAll(LOG, "Unable to close connection", ex);
- }
- }
-
- try {
- ArrayList<String> args = new ArrayList<String>();
- List<String> envp = Executor.getCurEnvpStrings();
- args.add(conf.get("pgbulkload.bin", "pg_bulkload"));
- args.add("--username="
- + conf.get(DBConfiguration.USERNAME_PROPERTY));
- args.add("--dbname="
- + JdbcUrl.getDatabaseName(conf.get(DBConfiguration.URL_PROPERTY)));
- args.add("--host="
- + JdbcUrl.getHostName(conf.get(DBConfiguration.URL_PROPERTY)));
- args.add("--port="
- + JdbcUrl.getPort(conf.get(DBConfiguration.URL_PROPERTY)));
- args.add("--input=stdin");
- args.add("--output=" + tmpTableName);
- args.add("-o");
- args.add("TYPE=CSV");
- args.add("-o");
- args.add("DELIMITER=" + conf.get("pgbulkload.input.field.delim", ","));
- args.add("-o");
- args.add("QUOTE=" + conf.get("pgbulkload.input.enclosedby", "\""));
- args.add("-o");
- args.add("ESCAPE=" + conf.get("pgbulkload.input.escapedby", "\""));
- args.add("-o");
- args.add("CHECK_CONSTRAINTS=" + conf.get("pgbulkload.check.constraints"));
- args.add("-o");
- args.add("PARSE_ERRORS=" + conf.get("pgbulkload.parse.errors"));
- args.add("-o");
- args.add("DUPLICATE_ERRORS=" + conf.get("pgbulkload.duplicate.errors"));
- if (conf.get("pgbulkload.null.string") != null) {
- args.add("-o");
- args.add("NULL=" + conf.get("pgbulkload.null.string"));
- }
- if (conf.get("pgbulkload.filter") != null) {
- args.add("-o");
- args.add("FILTER=" + conf.get("pgbulkload.filter"));
- }
- LOG.debug("Starting pg_bulkload with arguments:");
- for (String arg : args) {
- LOG.debug(" " + arg);
- }
- if (conf.get(DBConfiguration.PASSWORD_PROPERTY) != null) {
- String tmpDir = System.getProperty("test.build.data", "/tmp/");
- if (!tmpDir.endsWith(File.separator)) {
- tmpDir = tmpDir + File.separator;
- }
- tmpDir = conf.get("job.local.dir", tmpDir);
- passwordFilename = PostgreSQLUtils.writePasswordFile(tmpDir,
- conf.get(DBConfiguration.PASSWORD_PROPERTY));
- envp.add("PGPASSFILE=" + passwordFilename);
- }
- process = Runtime.getRuntime().exec(args.toArray(new String[0]),
- envp.toArray(new String[0]));
- out = process.getOutputStream();
- writer = new BufferedWriter(new OutputStreamWriter(out));
- thread = new ReadThread(process.getErrorStream());
- thread.start();
- } catch (Exception e) {
- LOG.error("Can't start up pg_bulkload process", e);
- cleanup(context);
- doExecuteUpdate("DROP TABLE " + tmpTableName);
- throw new IOException(e);
- }
- }
-
-
- public void map(LongWritable key, Writable value, Context context)
- throws IOException, InterruptedException {
- try {
- String str = value.toString();
- if (value instanceof Text) {
- writer.write(str, 0, str.length());
- writer.newLine();
- } else if (value instanceof SqoopRecord) {
- writer.write(str, 0, str.length());
- }
- } catch (Exception e) {
- doExecuteUpdate("DROP TABLE " + tmpTableName);
- cleanup(context);
- throw new IOException(e);
- }
- }
-
-
- protected void cleanup(Context context)
- throws IOException, InterruptedException {
- LongWritable taskid =
- new LongWritable(context.getTaskAttemptID().getTaskID().getId());
- context.write(taskid, new Text(tmpTableName));
-
- if (writer != null) {
- writer.close();
- }
- if (out != null) {
- out.close();
- }
- try {
- if (thread != null) {
- thread.join();
- }
- } finally {
- // block until the process is done.
- if (null != process) {
- while (true) {
- try {
- int returnValue = process.waitFor();
-
- // Check pg_bulkload's process return value
- if (returnValue != 0) {
- throw new RuntimeException(
- "Unexpected return value from pg_bulkload: "+ returnValue);
- }
- } catch (InterruptedException ie) {
- // interrupted; loop around.
- LOG.debug("Caught interrupted exception waiting for process "
- + "pg_bulkload.bin to exit");
- //Clear the interrupted flag. We have to call Thread.interrupted
- //to clear for interrupted exceptions from process.waitFor
- //See http://bugs.sun.com/view_bug.do?bug_id=6420270 for more info
- Thread.interrupted();
- continue;
- }
- break;
- }
- }
- }
- if (null != passwordFilename) {
- if (!new File(passwordFilename).delete()) {
- LOG.error("Could not remove postgresql password file "
- + passwordFilename);
- LOG.error("You should remove this file to protect your credentials.");
- }
- }
- }
-
-
- protected int doExecuteUpdate(String query) throws IOException {
- Connection conn = null;
- try {
- conn = dbConf.getConnection();
- conn.setAutoCommit(false);
- } catch (ClassNotFoundException ex) {
- LOG.error("Unable to load JDBC driver class", ex);
- throw new IOException(ex);
- } catch (SQLException ex) {
- LoggingUtils.logAll(LOG, "Unable to connect to database", ex);
- throw new IOException(ex);
- }
- Statement stmt = null;
- try {
- stmt = conn.createStatement();
- int ret = stmt.executeUpdate(query);
- conn.commit();
- return ret;
- } catch (SQLException ex) {
- LoggingUtils.logAll(LOG, "Unable to execute query: " + query, ex);
- throw new IOException(ex);
- } finally {
- if (stmt != null) {
- try {
- stmt.close();
- } catch (SQLException ex) {
- LoggingUtils.logAll(LOG, "Unable to close statement", ex);
- }
- }
- try {
- conn.close();
- } catch (SQLException ex) {
- LoggingUtils.logAll(LOG, "Unable to close connection", ex);
- }
- }
- }
-
-
- private class ReadThread extends Thread {
- private InputStream in;
-
- ReadThread(InputStream in) {
- this.in = in;
- }
-
- public void run() {
- BufferedReader reader = new BufferedReader(new InputStreamReader(in));
- String line = null;
- try {
- while((line = reader.readLine()) != null) {
- System.out.println(line);
- }
- reader.close();
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/b8fd6020/src/java/org/apache/sqoop/mapreduce/PGBulkloadExportReducer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/PGBulkloadExportReducer.java b/src/java/org/apache/sqoop/mapreduce/PGBulkloadExportReducer.java
deleted file mode 100644
index 6f55861..0000000
--- a/src/java/org/apache/sqoop/mapreduce/PGBulkloadExportReducer.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.sqoop.mapreduce;
-
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.SQLException;
-import java.sql.Statement;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.sqoop.mapreduce.db.DBConfiguration;
-import org.apache.sqoop.util.LoggingUtils;
-
-
-/**
- * Reducer for transfering data from temporary table to destination.
- * Reducer drops all temporary tables if all data successfully transfered.
- * Temporary tables is not dropptd in error case for manual retry.
- */
-public class PGBulkloadExportReducer
- extends AutoProgressReducer<LongWritable, Text,
- NullWritable, NullWritable> {
-
- public static final Log LOG =
- LogFactory.getLog(PGBulkloadExportReducer.class.getName());
- private Configuration conf;
- private DBConfiguration dbConf;
- private Connection conn;
- private String tableName;
-
-
- protected void setup(Context context)
- throws IOException, InterruptedException {
- conf = context.getConfiguration();
- dbConf = new DBConfiguration(conf);
- tableName = dbConf.getOutputTableName();
- try {
- conn = dbConf.getConnection();
- conn.setAutoCommit(false);
- } catch (ClassNotFoundException ex) {
- LOG.error("Unable to load JDBC driver class", ex);
- throw new IOException(ex);
- } catch (SQLException ex) {
- LoggingUtils.logAll(LOG, "Unable to connect to database", ex);
- throw new IOException(ex);
- }
- }
-
-
- @Override
- public void reduce(LongWritable key, Iterable<Text> values, Context context)
- throws IOException, InterruptedException {
- Statement stmt = null;
- try {
- stmt = conn.createStatement();
- for (Text value : values) {
- int inserted = stmt.executeUpdate("INSERT INTO " + tableName
- + " ( SELECT * FROM " + value + " )");
- stmt.executeUpdate("DROP TABLE " + value);
- }
- conn.commit();
- } catch (SQLException ex) {
- LoggingUtils.logAll(LOG, "Unable to execute create query.", ex);
- throw new IOException(ex);
- } finally {
- if (stmt != null) {
- try {
- stmt.close();
- } catch (SQLException ex) {
- LoggingUtils.logAll(LOG, "Unable to close statement", ex);
- }
- }
- }
- }
-
-
- protected void cleanup(Context context)
- throws IOException, InterruptedException {
- try {
- conn.close();
- } catch (SQLException ex) {
- LoggingUtils.logAll(LOG, "Unable to load JDBC driver class", ex);
- throw new IOException(ex);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/b8fd6020/src/java/org/apache/sqoop/mapreduce/postgresql/PGBulkloadExportJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/postgresql/PGBulkloadExportJob.java b/src/java/org/apache/sqoop/mapreduce/postgresql/PGBulkloadExportJob.java
new file mode 100644
index 0000000..79fb7da
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/postgresql/PGBulkloadExportJob.java
@@ -0,0 +1,210 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce.postgresql;
+
+import java.io.IOException;
+import com.cloudera.sqoop.manager.ExportJobContext;
+import com.cloudera.sqoop.util.ExportException;
+import com.cloudera.sqoop.SqoopOptions;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.sqoop.config.ConfigurationHelper;
+import org.apache.sqoop.lib.DelimiterSet;
+import org.apache.sqoop.manager.ConnManager;
+import org.apache.sqoop.mapreduce.ExportJobBase;
+import org.apache.sqoop.mapreduce.db.DBConfiguration;
+import org.apache.sqoop.orm.TableClassName;
+
+
+/**
+ * Class that runs an export job using pg_bulkload in the mapper.
+ */
+public class PGBulkloadExportJob extends ExportJobBase {
+
+ public static final Log LOG =
+ LogFactory.getLog(PGBulkloadExportJob.class.getName());
+
+
+ public PGBulkloadExportJob(final ExportJobContext context) {
+ super(context);
+ }
+
+
+ public PGBulkloadExportJob(final ExportJobContext ctxt,
+ final Class<? extends Mapper> mapperClass,
+ final Class<? extends InputFormat> inputFormatClass,
+ final Class<? extends OutputFormat> outputFormatClass) {
+ super(ctxt, mapperClass, inputFormatClass, outputFormatClass);
+ }
+
+
+ @Override
+ protected void configureInputFormat(Job job, String tableName,
+ String tableClassName, String splitByCol)
+ throws ClassNotFoundException, IOException {
+ super.configureInputFormat(job, tableName, tableClassName, splitByCol);
+ ConnManager mgr = context.getConnManager();
+ String username = options.getUsername();
+ if (null == username || username.length() == 0) {
+ DBConfiguration.configureDB(job.getConfiguration(),
+ mgr.getDriverClass(),
+ options.getConnectString(),
+ options.getFetchSize(),
+ options.getConnectionParams());
+ } else {
+ DBConfiguration.configureDB(job.getConfiguration(),
+ mgr.getDriverClass(),
+ options.getConnectString(),
+ username, options.getPassword(),
+ options.getFetchSize(),
+ options.getConnectionParams());
+ }
+ }
+
+
+ @Override
+ protected Class<? extends Mapper> getMapperClass() {
+ return PGBulkloadExportMapper.class;
+ }
+
+
+ protected Class<? extends Reducer> getReducerClass() {
+ return PGBulkloadExportReducer.class;
+ }
+
+
+ private void setDelimiter(String prop, char val, Configuration conf) {
+ switch (val) {
+ case DelimiterSet.NULL_CHAR:
+ break;
+ case '\t':
+ default:
+ conf.set(prop, String.valueOf(val));
+ }
+ }
+
+
+ @Override
+ protected void propagateOptionsToJob(Job job) {
+ super.propagateOptionsToJob(job);
+ SqoopOptions opts = context.getOptions();
+ Configuration conf = job.getConfiguration();
+ conf.setIfUnset("pgbulkload.bin", "pg_bulkload");
+ if (opts.getNullStringValue() != null) {
+ conf.set("pgbulkload.null.string", opts.getNullStringValue());
+ }
+ setDelimiter("pgbulkload.input.field.delim",
+ opts.getInputFieldDelim(),
+ conf);
+ setDelimiter("pgbulkload.input.record.delim",
+ opts.getInputRecordDelim(),
+ conf);
+ setDelimiter("pgbulkload.input.enclosedby",
+ opts.getInputEnclosedBy(),
+ conf);
+ setDelimiter("pgbulkload.input.escapedby",
+ opts.getInputEscapedBy(),
+ conf);
+ conf.setBoolean("pgbulkload.input.encloserequired",
+ opts.isInputEncloseRequired());
+ conf.setIfUnset("pgbulkload.check.constraints", "YES");
+ conf.setIfUnset("pgbulkload.parse.errors", "INFINITE");
+ conf.setIfUnset("pgbulkload.duplicate.errors", "INFINITE");
+ conf.set("mapred.jar", context.getJarFile());
+ conf.setBoolean("mapred.map.tasks.speculative.execution", false);
+ conf.setBoolean("mapred.reduce.tasks.speculative.execution", false);
+ conf.setInt("mapred.map.max.attempts", 1);
+ conf.setInt("mapred.reduce.max.attempts", 1);
+ conf.setIfUnset("mapred.reduce.tasks", "1");
+ if (context.getOptions().doClearStagingTable()) {
+ conf.setBoolean("pgbulkload.clear.staging.table", true);
+ }
+ }
+
+
+ @Override
+ public void runExport() throws ExportException, IOException {
+ ConnManager cmgr = context.getConnManager();
+ SqoopOptions options = context.getOptions();
+ Configuration conf = options.getConf();
+ DBConfiguration dbConf = null;
+ String outputTableName = context.getTableName();
+ String tableName = outputTableName;
+ String tableClassName =
+ new TableClassName(options).getClassForTable(outputTableName);
+
+ LOG.info("Beginning export of " + outputTableName);
+ loadJars(conf, context.getJarFile(), tableClassName);
+
+ try {
+ Job job = new Job(conf);
+ dbConf = new DBConfiguration(job.getConfiguration());
+ dbConf.setOutputTableName(tableName);
+ configureInputFormat(job, tableName, tableClassName, null);
+ configureOutputFormat(job, tableName, tableClassName);
+ configureNumTasks(job);
+ propagateOptionsToJob(job);
+ job.setMapperClass(getMapperClass());
+ job.setMapOutputKeyClass(LongWritable.class);
+ job.setMapOutputValueClass(Text.class);
+ job.setReducerClass(getReducerClass());
+ cacheJars(job, context.getConnManager());
+ setJob(job);
+
+ boolean success = runJob(job);
+ if (!success) {
+ throw new ExportException("Export job failed!");
+ }
+ } catch (InterruptedException ie) {
+ throw new IOException(ie);
+ } catch (ClassNotFoundException cnfe) {
+ throw new IOException(cnfe);
+ } finally {
+ unloadJars();
+ }
+ }
+
+
+ @Override
+ protected int configureNumTasks(Job job) throws IOException {
+ SqoopOptions options = context.getOptions();
+ int numMapTasks = options.getNumMappers();
+ if (numMapTasks < 1) {
+ numMapTasks = SqoopOptions.DEFAULT_NUM_MAPPERS;
+ LOG.warn("Invalid mapper count; using " + numMapTasks + " mappers.");
+ }
+
+ ConfigurationHelper.setJobNumMaps(job, numMapTasks);
+ return numMapTasks;
+ }
+
+
+ private void clearStagingTable(DBConfiguration dbConf, String tableName)
+ throws IOException {
+ // clearing stagingtable is done each mapper tasks
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/b8fd6020/src/java/org/apache/sqoop/mapreduce/postgresql/PGBulkloadExportMapper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/postgresql/PGBulkloadExportMapper.java b/src/java/org/apache/sqoop/mapreduce/postgresql/PGBulkloadExportMapper.java
new file mode 100644
index 0000000..333546f
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/postgresql/PGBulkloadExportMapper.java
@@ -0,0 +1,311 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce.postgresql;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.sqoop.lib.SqoopRecord;
+import org.apache.hadoop.mapreduce.Mapper.Context;
+import org.apache.sqoop.mapreduce.AutoProgressMapper;
+import org.apache.sqoop.mapreduce.db.DBConfiguration;
+import org.apache.sqoop.util.LoggingUtils;
+import org.apache.sqoop.util.PostgreSQLUtils;
+import org.apache.sqoop.util.Executor;
+import org.apache.sqoop.util.JdbcUrl;
+
+
+/**
+ * Mapper that starts a 'pg_bulkload' process and uses that to export rows from
+ * HDFS to a PostgreSQL database at high speed.
+ *
+ * map() methods are actually provided by subclasses that read from
+ * SequenceFiles (containing existing SqoopRecords) or text files
+ * (containing delimited lines) and deliver these results to the stream
+ * used to interface with pg_bulkload.
+ */
+public class PGBulkloadExportMapper
+ extends AutoProgressMapper<LongWritable, Writable, LongWritable, Text> {
+ private Configuration conf;
+ private DBConfiguration dbConf;
+ private Process process;
+ private OutputStream out;
+ protected BufferedWriter writer;
+ private Thread thread;
+ protected String tmpTableName;
+ private String tableName;
+ private String passwordFilename;
+
+
+ public PGBulkloadExportMapper() {
+ }
+
+
+ protected void setup(Context context)
+ throws IOException, InterruptedException {
+ super.setup(context);
+ conf = context.getConfiguration();
+ dbConf = new DBConfiguration(conf);
+ tableName = dbConf.getOutputTableName();
+ tmpTableName = tableName + "_" + context.getTaskAttemptID().toString();
+
+ Connection conn = null;
+ try {
+ conn = dbConf.getConnection();
+ conn.setAutoCommit(false);
+ if (conf.getBoolean("pgbulkload.clear.staging.table", false)) {
+ StringBuffer query = new StringBuffer();
+ query.append("DROP TABLE IF EXISTS ");
+ query.append(tmpTableName);
+ doExecuteUpdate(query.toString());
+ }
+ StringBuffer query = new StringBuffer();
+ query.append("CREATE TABLE ");
+ query.append(tmpTableName);
+ query.append("(LIKE ");
+ query.append(tableName);
+ query.append(" INCLUDING CONSTRAINTS)");
+ if (conf.get("pgbulkload.staging.tablespace") != null) {
+ query.append("TABLESPACE ");
+ query.append(conf.get("pgbulkload.staging.tablespace"));
+ }
+ doExecuteUpdate(query.toString());
+ conn.commit();
+ } catch (ClassNotFoundException ex) {
+ LOG.error("Unable to load JDBC driver class", ex);
+ throw new IOException(ex);
+ } catch (SQLException ex) {
+ LoggingUtils.logAll(LOG, "Unable to execute statement", ex);
+ throw new IOException(ex);
+ } finally {
+ try {
+ conn.close();
+ } catch (SQLException ex) {
+ LoggingUtils.logAll(LOG, "Unable to close connection", ex);
+ }
+ }
+
+ try {
+ ArrayList<String> args = new ArrayList<String>();
+ List<String> envp = Executor.getCurEnvpStrings();
+ args.add(conf.get("pgbulkload.bin", "pg_bulkload"));
+ args.add("--username="
+ + conf.get(DBConfiguration.USERNAME_PROPERTY));
+ args.add("--dbname="
+ + JdbcUrl.getDatabaseName(conf.get(DBConfiguration.URL_PROPERTY)));
+ args.add("--host="
+ + JdbcUrl.getHostName(conf.get(DBConfiguration.URL_PROPERTY)));
+ args.add("--port="
+ + JdbcUrl.getPort(conf.get(DBConfiguration.URL_PROPERTY)));
+ args.add("--input=stdin");
+ args.add("--output=" + tmpTableName);
+ args.add("-o");
+ args.add("TYPE=CSV");
+ args.add("-o");
+ args.add("DELIMITER=" + conf.get("pgbulkload.input.field.delim", ","));
+ args.add("-o");
+ args.add("QUOTE=" + conf.get("pgbulkload.input.enclosedby", "\""));
+ args.add("-o");
+ args.add("ESCAPE=" + conf.get("pgbulkload.input.escapedby", "\""));
+ args.add("-o");
+ args.add("CHECK_CONSTRAINTS=" + conf.get("pgbulkload.check.constraints"));
+ args.add("-o");
+ args.add("PARSE_ERRORS=" + conf.get("pgbulkload.parse.errors"));
+ args.add("-o");
+ args.add("DUPLICATE_ERRORS=" + conf.get("pgbulkload.duplicate.errors"));
+ if (conf.get("pgbulkload.null.string") != null) {
+ args.add("-o");
+ args.add("NULL=" + conf.get("pgbulkload.null.string"));
+ }
+ if (conf.get("pgbulkload.filter") != null) {
+ args.add("-o");
+ args.add("FILTER=" + conf.get("pgbulkload.filter"));
+ }
+ LOG.debug("Starting pg_bulkload with arguments:");
+ for (String arg : args) {
+ LOG.debug(" " + arg);
+ }
+ if (conf.get(DBConfiguration.PASSWORD_PROPERTY) != null) {
+ String tmpDir = System.getProperty("test.build.data", "/tmp/");
+ if (!tmpDir.endsWith(File.separator)) {
+ tmpDir = tmpDir + File.separator;
+ }
+ tmpDir = conf.get("job.local.dir", tmpDir);
+ passwordFilename = PostgreSQLUtils.writePasswordFile(tmpDir,
+ conf.get(DBConfiguration.PASSWORD_PROPERTY));
+ envp.add("PGPASSFILE=" + passwordFilename);
+ }
+ process = Runtime.getRuntime().exec(args.toArray(new String[0]),
+ envp.toArray(new String[0]));
+ out = process.getOutputStream();
+ writer = new BufferedWriter(new OutputStreamWriter(out));
+ thread = new ReadThread(process.getErrorStream());
+ thread.start();
+ } catch (Exception e) {
+ LOG.error("Can't start up pg_bulkload process", e);
+ cleanup(context);
+ doExecuteUpdate("DROP TABLE " + tmpTableName);
+ throw new IOException(e);
+ }
+ }
+
+
+ public void map(LongWritable key, Writable value, Context context)
+ throws IOException, InterruptedException {
+ try {
+ String str = value.toString();
+ if (value instanceof Text) {
+ writer.write(str, 0, str.length());
+ writer.newLine();
+ } else if (value instanceof SqoopRecord) {
+ writer.write(str, 0, str.length());
+ }
+ } catch (Exception e) {
+ doExecuteUpdate("DROP TABLE " + tmpTableName);
+ cleanup(context);
+ throw new IOException(e);
+ }
+ }
+
+
+ protected void cleanup(Context context)
+ throws IOException, InterruptedException {
+ LongWritable taskid =
+ new LongWritable(context.getTaskAttemptID().getTaskID().getId());
+ context.write(taskid, new Text(tmpTableName));
+
+ if (writer != null) {
+ writer.close();
+ }
+ if (out != null) {
+ out.close();
+ }
+ try {
+ if (thread != null) {
+ thread.join();
+ }
+ } finally {
+ // block until the process is done.
+ if (null != process) {
+ while (true) {
+ try {
+ int returnValue = process.waitFor();
+
+ // Check pg_bulkload's process return value
+ if (returnValue != 0) {
+ throw new RuntimeException(
+ "Unexpected return value from pg_bulkload: "+ returnValue);
+ }
+ } catch (InterruptedException ie) {
+ // interrupted; loop around.
+ LOG.debug("Caught interrupted exception waiting for process "
+ + "pg_bulkload.bin to exit");
+ //Clear the interrupted flag. We have to call Thread.interrupted
+ //to clear for interrupted exceptions from process.waitFor
+ //See http://bugs.sun.com/view_bug.do?bug_id=6420270 for more info
+ Thread.interrupted();
+ continue;
+ }
+ break;
+ }
+ }
+ }
+ if (null != passwordFilename) {
+ if (!new File(passwordFilename).delete()) {
+ LOG.error("Could not remove postgresql password file "
+ + passwordFilename);
+ LOG.error("You should remove this file to protect your credentials.");
+ }
+ }
+ }
+
+
+ protected int doExecuteUpdate(String query) throws IOException {
+ Connection conn = null;
+ try {
+ conn = dbConf.getConnection();
+ conn.setAutoCommit(false);
+ } catch (ClassNotFoundException ex) {
+ LOG.error("Unable to load JDBC driver class", ex);
+ throw new IOException(ex);
+ } catch (SQLException ex) {
+ LoggingUtils.logAll(LOG, "Unable to connect to database", ex);
+ throw new IOException(ex);
+ }
+ Statement stmt = null;
+ try {
+ stmt = conn.createStatement();
+ int ret = stmt.executeUpdate(query);
+ conn.commit();
+ return ret;
+ } catch (SQLException ex) {
+ LoggingUtils.logAll(LOG, "Unable to execute query: " + query, ex);
+ throw new IOException(ex);
+ } finally {
+ if (stmt != null) {
+ try {
+ stmt.close();
+ } catch (SQLException ex) {
+ LoggingUtils.logAll(LOG, "Unable to close statement", ex);
+ }
+ }
+ try {
+ conn.close();
+ } catch (SQLException ex) {
+ LoggingUtils.logAll(LOG, "Unable to close connection", ex);
+ }
+ }
+ }
+
+
+ private class ReadThread extends Thread {
+ private InputStream in;
+
+ ReadThread(InputStream in) {
+ this.in = in;
+ }
+
+ public void run() {
+ BufferedReader reader = new BufferedReader(new InputStreamReader(in));
+ String line = null;
+ try {
+ while((line = reader.readLine()) != null) {
+ System.out.println(line);
+ }
+ reader.close();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/b8fd6020/src/java/org/apache/sqoop/mapreduce/postgresql/PGBulkloadExportReducer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/postgresql/PGBulkloadExportReducer.java b/src/java/org/apache/sqoop/mapreduce/postgresql/PGBulkloadExportReducer.java
new file mode 100644
index 0000000..3dc05a7
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/postgresql/PGBulkloadExportReducer.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce.postgresql;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.sqoop.mapreduce.AutoProgressReducer;
+import org.apache.sqoop.mapreduce.db.DBConfiguration;
+import org.apache.sqoop.util.LoggingUtils;
+
+
+/**
+ * Reducer for transfering data from temporary table to destination.
+ * Reducer drops all temporary tables if all data successfully transfered.
+ * Temporary tables is not dropptd in error case for manual retry.
+ */
+public class PGBulkloadExportReducer
+ extends AutoProgressReducer<LongWritable, Text,
+ NullWritable, NullWritable> {
+
+ public static final Log LOG =
+ LogFactory.getLog(PGBulkloadExportReducer.class.getName());
+ private Configuration conf;
+ private DBConfiguration dbConf;
+ private Connection conn;
+ private String tableName;
+
+
+ protected void setup(Context context)
+ throws IOException, InterruptedException {
+ conf = context.getConfiguration();
+ dbConf = new DBConfiguration(conf);
+ tableName = dbConf.getOutputTableName();
+ try {
+ conn = dbConf.getConnection();
+ conn.setAutoCommit(false);
+ } catch (ClassNotFoundException ex) {
+ LOG.error("Unable to load JDBC driver class", ex);
+ throw new IOException(ex);
+ } catch (SQLException ex) {
+ LoggingUtils.logAll(LOG, "Unable to connect to database", ex);
+ throw new IOException(ex);
+ }
+ }
+
+
+ @Override
+ public void reduce(LongWritable key, Iterable<Text> values, Context context)
+ throws IOException, InterruptedException {
+ Statement stmt = null;
+ try {
+ stmt = conn.createStatement();
+ for (Text value : values) {
+ int inserted = stmt.executeUpdate("INSERT INTO " + tableName
+ + " ( SELECT * FROM " + value + " )");
+ stmt.executeUpdate("DROP TABLE " + value);
+ }
+ conn.commit();
+ } catch (SQLException ex) {
+ LoggingUtils.logAll(LOG, "Unable to execute create query.", ex);
+ throw new IOException(ex);
+ } finally {
+ if (stmt != null) {
+ try {
+ stmt.close();
+ } catch (SQLException ex) {
+ LoggingUtils.logAll(LOG, "Unable to close statement", ex);
+ }
+ }
+ }
+ }
+
+
+ protected void cleanup(Context context)
+ throws IOException, InterruptedException {
+ try {
+ conn.close();
+ } catch (SQLException ex) {
+ LoggingUtils.logAll(LOG, "Unable to load JDBC driver class", ex);
+ throw new IOException(ex);
+ }
+ }
+
+}