You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ja...@apache.org on 2012/11/04 17:04:07 UTC
git commit: SQOOP-655: Generic JDBC connector for export
Updated Branches:
refs/heads/sqoop2 2481b7f8d -> 0976713f0
SQOOP-655: Generic JDBC connector for export
(Bilung Lee via Jarek Jarcec Cecho)
Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/0976713f
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/0976713f
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/0976713f
Branch: refs/heads/sqoop2
Commit: 0976713f0104709565b8c3a4c628c1abdca83569
Parents: 2481b7f
Author: Jarek Jarcec Cecho <ja...@apache.org>
Authored: Sun Nov 4 08:02:53 2012 -0800
Committer: Jarek Jarcec Cecho <ja...@apache.org>
Committed: Sun Nov 4 08:02:53 2012 -0800
----------------------------------------------------------------------
.../java/org/apache/sqoop/common/MapContext.java | 1 -
.../sqoop/connector/jdbc/GenericJdbcExecutor.java | 48 ++++
.../connector/jdbc/GenericJdbcExportDestroyer.java | 4 +-
.../jdbc/GenericJdbcExportInitializer.java | 170 ++++++++++++++-
.../connector/jdbc/GenericJdbcExportLoader.java | 51 +++++-
.../connector/jdbc/GenericJdbcImportDestroyer.java | 4 +-
.../jdbc/GenericJdbcImportInitializer.java | 14 +-
.../jdbc/GenericJdbcImportPartitioner.java | 1 -
.../connector/jdbc/TestExportInitializer.java | 164 ++++++++++++++
.../sqoop/connector/jdbc/TestExportLoader.java | 140 ++++++++++++
.../main/java/org/apache/sqoop/job/Constants.java | 3 +
.../java/org/apache/sqoop/job/etl/Destroyer.java | 5 +-
.../java/org/apache/sqoop/job/etl/Initializer.java | 8 +-
13 files changed, 589 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/sqoop/blob/0976713f/common/src/main/java/org/apache/sqoop/common/MapContext.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/common/MapContext.java b/common/src/main/java/org/apache/sqoop/common/MapContext.java
index c1d24ad..b245148 100644
--- a/common/src/main/java/org/apache/sqoop/common/MapContext.java
+++ b/common/src/main/java/org/apache/sqoop/common/MapContext.java
@@ -18,7 +18,6 @@
package org.apache.sqoop.common;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.Map;
/**
http://git-wip-us.apache.org/repos/asf/sqoop/blob/0976713f/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutor.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutor.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutor.java
index 226fcd3..2dba8af 100644
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutor.java
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutor.java
@@ -20,6 +20,7 @@ package org.apache.sqoop.connector.jdbc;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.DriverManager;
+import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
@@ -30,6 +31,7 @@ import org.apache.sqoop.common.SqoopException;
public class GenericJdbcExecutor {
private Connection connection;
+ private PreparedStatement preparedStatement;
public GenericJdbcExecutor(String driver, String url,
String username, String password) {
@@ -71,6 +73,52 @@ public class GenericJdbcExecutor {
}
}
+ public void beginBatch(String sql) {
+ try {
+ preparedStatement = connection.prepareStatement(sql,
+ ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+
+ } catch (SQLException e) {
+ throw new SqoopException(
+ GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0002, e);
+ }
+ }
+
+ public void addBatch(Object[] array) {
+ try {
+ for (int i=0; i<array.length; i++) {
+ preparedStatement.setObject(i+1, array[i]);
+ }
+ preparedStatement.addBatch();
+ } catch (SQLException e) {
+ throw new SqoopException(
+ GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0002, e);
+ }
+ }
+
+ public void executeBatch(boolean commit) {
+ try {
+ preparedStatement.executeBatch();
+ if (commit) {
+ connection.commit();
+ }
+ } catch (SQLException e) {
+ throw new SqoopException(
+ GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0002, e);
+ }
+ }
+
+ public void endBatch() {
+ try {
+ if (preparedStatement != null) {
+ preparedStatement.close();
+ }
+ } catch (SQLException e) {
+ throw new SqoopException(
+ GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0002, e);
+ }
+ }
+
public String getPrimaryKey(String table) {
try {
String[] splitNames = dequalify(table);
http://git-wip-us.apache.org/repos/asf/sqoop/blob/0976713f/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportDestroyer.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportDestroyer.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportDestroyer.java
index c230f01..7f952ac 100644
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportDestroyer.java
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportDestroyer.java
@@ -17,13 +17,13 @@
*/
package org.apache.sqoop.connector.jdbc;
-import org.apache.sqoop.common.MapContext;
+import org.apache.sqoop.common.ImmutableContext;
import org.apache.sqoop.job.etl.Destroyer;
public class GenericJdbcExportDestroyer extends Destroyer {
@Override
- public void run(MapContext context) {
+ public void run(ImmutableContext context) {
// TODO Auto-generated method stub
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/0976713f/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportInitializer.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportInitializer.java
index 0e91767..72b992c 100644
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportInitializer.java
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportInitializer.java
@@ -17,14 +17,178 @@
*/
package org.apache.sqoop.connector.jdbc;
-import org.apache.sqoop.common.MutableMapContext;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.sqoop.common.ImmutableContext;
+import org.apache.sqoop.common.MutableContext;
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration;
+import org.apache.sqoop.connector.jdbc.configuration.ExportJobConfiguration;
+import org.apache.sqoop.connector.jdbc.configuration.ImportJobConfiguration;
+import org.apache.sqoop.job.Constants;
import org.apache.sqoop.job.etl.Initializer;
+import org.apache.sqoop.utils.ClassUtils;
public class GenericJdbcExportInitializer extends Initializer {
+ private GenericJdbcExecutor executor;
+
+ @Override
+ public void initialize(MutableContext context, Object connectionConfiguration, Object jobConfiguration) {
+ ConnectionConfiguration connectionConfig = (ConnectionConfiguration)connectionConfiguration;
+ ExportJobConfiguration jobConfig = (ExportJobConfiguration)jobConfiguration;
+
+ configureJdbcProperties(context, connectionConfig, jobConfig);
+ try {
+ configureTableProperties(context, connectionConfig, jobConfig);
+
+ } finally {
+ executor.close();
+ }
+ }
+
@Override
- public void initialize(MutableMapContext context, Object connectionConfiguration, Object jobConfiguration) {
- // TODO Auto-generated method stub
+ public List<String> getJars(ImmutableContext context, Object connectionConfiguration, Object jobConfiguration) {
+ List<String> jars = new LinkedList<String>();
+
+ ConnectionConfiguration connection = (ConnectionConfiguration) connectionConfiguration;
+ jars.add(ClassUtils.jarForClass(connection.jdbcDriver));
+
+ return jars;
+ }
+
+ private void configureJdbcProperties(MutableContext context, ConnectionConfiguration connectionConfig, ExportJobConfiguration jobConfig) {
+ String driver = connectionConfig.jdbcDriver;
+ String url = connectionConfig.connectionString;
+ String username = connectionConfig.username;
+ String password = connectionConfig.password;
+
+ if (driver == null) {
+ throw new SqoopException(
+ GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0012,
+ GenericJdbcConnectorConstants.INPUT_CONN_JDBCDRIVER);
+ }
+ context.setString(
+ GenericJdbcConnectorConstants.CONNECTOR_JDBC_DRIVER,
+ driver);
+
+ if (url == null) {
+ throw new SqoopException(
+ GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0012,
+ GenericJdbcConnectorConstants.INPUT_CONN_CONNECTSTRING);
+ }
+ context.setString(
+ GenericJdbcConnectorConstants.CONNECTOR_JDBC_URL,
+ url);
+
+ if (username != null) {
+ context.setString(
+ GenericJdbcConnectorConstants.CONNECTOR_JDBC_USERNAME,
+ username);
+ }
+
+ if (password != null) {
+ context.setString(
+ GenericJdbcConnectorConstants.CONNECTOR_JDBC_PASSWORD,
+ password);
+ }
+
+ executor = new GenericJdbcExecutor(driver, url, username, password);
+ }
+
+ private void configureTableProperties(MutableContext context, ConnectionConfiguration connectionConfig, ExportJobConfiguration jobConfig) {
+ String dataSql;
+ String inputDirectory;
+
+ String tableName = connectionConfig.tableName;
+ String tableSql = connectionConfig.sql;
+ String tableColumns = connectionConfig.columns;
+
+ String datadir = connectionConfig.dataDirectory;
+ String warehouse = connectionConfig.warehouse;
+ if (warehouse == null) {
+ warehouse = GenericJdbcConnectorConstants.DEFAULT_WAREHOUSE;
+ } else if (!warehouse.endsWith(GenericJdbcConnectorConstants.FILE_SEPARATOR)) {
+ warehouse += GenericJdbcConnectorConstants.FILE_SEPARATOR;
+ }
+
+ if (tableName != null && tableSql != null) {
+ // when both table name and table sql are specified:
+ throw new SqoopException(
+ GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0007);
+
+ } else if (tableName != null) {
+ // when table name is specified:
+
+ if (tableColumns == null) {
+ String[] columns = executor.getQueryColumns("SELECT * FROM "
+ + executor.delimitIdentifier(tableName) + " WHERE 1 = 0");
+ StringBuilder builder = new StringBuilder();
+ builder.append("INSERT INTO ");
+ builder.append(executor.delimitIdentifier(tableName));
+ builder.append(" VALUES (?");
+ for (int i = 1; i < columns.length; i++) {
+ builder.append(",?");
+ }
+ builder.append(")");
+ dataSql = builder.toString();
+
+ } else {
+ String[] columns = StringUtils.split(tableColumns, ',');
+ StringBuilder builder = new StringBuilder();
+ builder.append("INSERT INTO ");
+ builder.append(executor.delimitIdentifier(tableName));
+ builder.append(" (");
+ builder.append(tableColumns);
+ builder.append(") VALUES (?");
+ for (int i = 1; i < columns.length; i++) {
+ builder.append(",?");
+ }
+ builder.append(")");
+ dataSql = builder.toString();
+ }
+
+ if (datadir == null) {
+ inputDirectory = warehouse + tableName;
+ } else {
+ inputDirectory = warehouse + datadir;
+ }
+
+ } else if (tableSql != null) {
+ // when table sql is specified:
+
+ if (tableSql.indexOf(
+ GenericJdbcConnectorConstants.SQL_PARAMETER_MARKER) == -1) {
+ // make sure parameter marker is in the specified sql
+ throw new SqoopException(
+ GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0013);
+ }
+
+ if (tableColumns == null) {
+ dataSql = tableSql;
+ } else {
+ throw new SqoopException(
+ GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0014);
+ }
+
+ if (datadir == null) {
+ inputDirectory =
+ warehouse + GenericJdbcConnectorConstants.DEFAULT_DATADIR;
+ } else {
+ inputDirectory = warehouse + datadir;
+ }
+
+ } else {
+ // when neither are specified:
+ throw new SqoopException(
+ GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0008);
+ }
+
+ context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_DATA_SQL,
+ dataSql.toString());
+ context.setString(Constants.JOB_ETL_INPUT_DIRECTORY, inputDirectory);
}
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/0976713f/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportLoader.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportLoader.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportLoader.java
index 4cf0595..ff7384c 100644
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportLoader.java
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportLoader.java
@@ -23,9 +23,58 @@ import org.apache.sqoop.job.io.DataReader;
public class GenericJdbcExportLoader extends Loader {
+ public static final int DEFAULT_ROWS_PER_BATCH = 100;
+ public static final int DEFAULT_BATCHES_PER_TRANSACTION = 100;
+ private int rowsPerBatch = DEFAULT_ROWS_PER_BATCH;
+ private int batchesPerTransaction = DEFAULT_BATCHES_PER_TRANSACTION;
+
@Override
public void run(ImmutableContext context, DataReader reader) {
- // TODO Auto-generated method stub
+ String driver = context.getString(
+ GenericJdbcConnectorConstants.CONNECTOR_JDBC_DRIVER);
+ String url = context.getString(
+ GenericJdbcConnectorConstants.CONNECTOR_JDBC_URL);
+ String username = context.getString(
+ GenericJdbcConnectorConstants.CONNECTOR_JDBC_USERNAME);
+ String password = context.getString(
+ GenericJdbcConnectorConstants.CONNECTOR_JDBC_PASSWORD);
+ GenericJdbcExecutor executor = new GenericJdbcExecutor(
+ driver, url, username, password);
+
+ String sql = context.getString(
+ GenericJdbcConnectorConstants.CONNECTOR_JDBC_DATA_SQL);
+ executor.beginBatch(sql);
+ try {
+ int numberOfRows = 0;
+ int numberOfBatches = 0;
+ Object[] array;
+
+ while ((array = reader.readArrayRecord()) != null) {
+ numberOfRows++;
+ executor.addBatch(array);
+
+ if (numberOfRows == rowsPerBatch) {
+ numberOfBatches++;
+ if (numberOfBatches == batchesPerTransaction) {
+ executor.executeBatch(true);
+ numberOfBatches = 0;
+ } else {
+ executor.executeBatch(false);
+ }
+ numberOfRows = 0;
+ }
+ }
+
+ if (numberOfRows != 0) {
+ // execute and commit the remaining rows
+ executor.executeBatch(true);
+ }
+
+ executor.endBatch();
+
+ } finally {
+ executor.close();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/0976713f/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportDestroyer.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportDestroyer.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportDestroyer.java
index 3f6718d..a53fa59 100644
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportDestroyer.java
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportDestroyer.java
@@ -17,13 +17,13 @@
*/
package org.apache.sqoop.connector.jdbc;
-import org.apache.sqoop.common.MapContext;
+import org.apache.sqoop.common.ImmutableContext;
import org.apache.sqoop.job.etl.Destroyer;
public class GenericJdbcImportDestroyer extends Destroyer {
@Override
- public void run(MapContext context) {
+ public void run(ImmutableContext context) {
// TODO Auto-generated method stub
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/0976713f/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportInitializer.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportInitializer.java
index 2075d99..f8e941c 100644
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportInitializer.java
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportInitializer.java
@@ -25,8 +25,8 @@ import java.util.List;
import org.apache.commons.lang.StringUtils;
import org.apache.log4j.Logger;
-import org.apache.sqoop.common.MapContext;
-import org.apache.sqoop.common.MutableMapContext;
+import org.apache.sqoop.common.ImmutableContext;
+import org.apache.sqoop.common.MutableContext;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration;
import org.apache.sqoop.connector.jdbc.configuration.ImportJobConfiguration;
@@ -42,7 +42,7 @@ public class GenericJdbcImportInitializer extends Initializer {
private GenericJdbcExecutor executor;
@Override
- public void initialize(MutableMapContext context, Object oConnectionConfig, Object oJobConfig) {
+ public void initialize(MutableContext context, Object oConnectionConfig, Object oJobConfig) {
ConnectionConfiguration connectionConfig = (ConnectionConfiguration)oConnectionConfig;
ImportJobConfiguration jobConfig = (ImportJobConfiguration)oJobConfig;
@@ -58,7 +58,7 @@ public class GenericJdbcImportInitializer extends Initializer {
}
@Override
- public List<String> getJars(MapContext context, Object connectionConfiguration, Object jobConfiguration) {
+ public List<String> getJars(ImmutableContext context, Object connectionConfiguration, Object jobConfiguration) {
List<String> jars = new LinkedList<String>();
ConnectionConfiguration connection = (ConnectionConfiguration) connectionConfiguration;
@@ -67,7 +67,7 @@ public class GenericJdbcImportInitializer extends Initializer {
return jars;
}
- private void configureJdbcProperties(MutableMapContext context, ConnectionConfiguration connectionConfig, ImportJobConfiguration jobConfig) {
+ private void configureJdbcProperties(MutableContext context, ConnectionConfiguration connectionConfig, ImportJobConfiguration jobConfig) {
String driver = connectionConfig.jdbcDriver;
String url = connectionConfig.connectionString;
String username = connectionConfig.username;
@@ -107,7 +107,7 @@ public class GenericJdbcImportInitializer extends Initializer {
executor = new GenericJdbcExecutor(driver, url, username, password);
}
- private void configurePartitionProperties(MutableMapContext context, ConnectionConfiguration connectionConfig, ImportJobConfiguration jobConfig) {
+ private void configurePartitionProperties(MutableContext context, ConnectionConfiguration connectionConfig, ImportJobConfiguration jobConfig) {
// ----- configure column name -----
String partitionColumnName = connectionConfig.partitionColumn;
@@ -207,7 +207,7 @@ public class GenericJdbcImportInitializer extends Initializer {
}
}
- private void configureTableProperties(MutableMapContext context, ConnectionConfiguration connectionConfig, ImportJobConfiguration jobConfig) {
+ private void configureTableProperties(MutableContext context, ConnectionConfiguration connectionConfig, ImportJobConfiguration jobConfig) {
String dataSql;
String fieldNames;
String outputDirectory;
http://git-wip-us.apache.org/repos/asf/sqoop/blob/0976713f/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartitioner.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartitioner.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartitioner.java
index 5071471..a6d3b52 100644
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartitioner.java
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartitioner.java
@@ -22,7 +22,6 @@ import java.util.LinkedList;
import java.util.List;
import org.apache.sqoop.common.ImmutableContext;
-import org.apache.sqoop.common.MapContext;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.job.Constants;
import org.apache.sqoop.job.etl.Partition;
http://git-wip-us.apache.org/repos/asf/sqoop/blob/0976713f/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportInitializer.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportInitializer.java
new file mode 100644
index 0000000..532e6fd
--- /dev/null
+++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportInitializer.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.jdbc;
+
+import java.util.Hashtable;
+
+import junit.framework.TestCase;
+
+import org.apache.sqoop.job.Constants;
+import org.apache.sqoop.job.etl.Initializer;
+//import org.apache.sqoop.job.etl.MutableContext;
+//import org.apache.sqoop.job.etl.Options;
+import org.junit.Test;
+
+public class TestExportInitializer extends TestCase {
+
+ private final String tableName;
+ private final String tableSql;
+ private final String tableColumns;
+
+ private GenericJdbcExecutor executor;
+
+ public TestExportInitializer() {
+ tableName = getClass().getSimpleName();
+ tableSql = "INSERT INTO \"" + tableName + "\" VALUES (?,?,?)";
+ tableColumns = "ICOL,VCOL";
+ }
+
+ public void testVoid() { }
+
+// @Override
+// public void setUp() {
+// executor = new GenericJdbcExecutor(GenericJdbcTestConstants.DRIVER,
+// GenericJdbcTestConstants.URL, null, null);
+//
+// if (!executor.existTable(tableName)) {
+// executor.executeUpdate("CREATE TABLE "
+// + executor.delimitIdentifier(tableName)
+// + "(ICOL INTEGER PRIMARY KEY, DCOL DOUBLE, VCOL VARCHAR(20))");
+// }
+// }
+//
+// @Override
+// public void tearDown() {
+// executor.close();
+// }
+//
+// @Test
+// public void testTableName() throws Exception {
+// DummyOptions options = new DummyOptions();
+// options.setOption(GenericJdbcConnectorConstants.INPUT_CONN_JDBCDRIVER,
+// GenericJdbcTestConstants.DRIVER);
+// options.setOption(GenericJdbcConnectorConstants.INPUT_CONN_CONNECTSTRING,
+// GenericJdbcTestConstants.URL);
+// options.setOption(GenericJdbcConnectorConstants.INPUT_TBL_NAME,
+// tableName);
+//
+// DummyContext context = new DummyContext();
+//
+// Initializer initializer = new GenericJdbcExportInitializer();
+// initializer.run(context, options);
+//
+// verifyResult(context,
+// "INSERT INTO " + executor.delimitIdentifier(tableName)
+// + " VALUES (?,?,?)",
+// GenericJdbcConnectorConstants.DEFAULT_WAREHOUSE + tableName);
+// }
+//
+// @Test
+// public void testTableNameWithTableColumns() throws Exception {
+// DummyOptions options = new DummyOptions();
+// options.setOption(GenericJdbcConnectorConstants.INPUT_CONN_JDBCDRIVER,
+// GenericJdbcTestConstants.DRIVER);
+// options.setOption(GenericJdbcConnectorConstants.INPUT_CONN_CONNECTSTRING,
+// GenericJdbcTestConstants.URL);
+// options.setOption(GenericJdbcConnectorConstants.INPUT_TBL_NAME,
+// tableName);
+// options.setOption(GenericJdbcConnectorConstants.INPUT_TBL_COLUMNS,
+// tableColumns);
+//
+// DummyContext context = new DummyContext();
+//
+// Initializer initializer = new GenericJdbcExportInitializer();
+// initializer.run(context, options);
+//
+// verifyResult(context,
+// "INSERT INTO " + executor.delimitIdentifier(tableName)
+// + " (" + tableColumns + ") VALUES (?,?)",
+// GenericJdbcConnectorConstants.DEFAULT_WAREHOUSE + tableName);
+// }
+//
+// @Test
+// public void testTableSql() throws Exception {
+// DummyOptions options = new DummyOptions();
+// options.setOption(GenericJdbcConnectorConstants.INPUT_CONN_JDBCDRIVER,
+// GenericJdbcTestConstants.DRIVER);
+// options.setOption(GenericJdbcConnectorConstants.INPUT_CONN_CONNECTSTRING,
+// GenericJdbcTestConstants.URL);
+// options.setOption(GenericJdbcConnectorConstants.INPUT_TBL_SQL,
+// tableSql);
+//
+// DummyContext context = new DummyContext();
+//
+// Initializer initializer = new GenericJdbcExportInitializer();
+// initializer.run(context, options);
+//
+// verifyResult(context,
+// "INSERT INTO " + executor.delimitIdentifier(tableName)
+// + " VALUES (?,?,?)",
+// GenericJdbcConnectorConstants.DEFAULT_WAREHOUSE
+// + GenericJdbcConnectorConstants.DEFAULT_DATADIR);
+// }
+//
+// private void verifyResult(DummyContext context,
+// String dataSql, String inputDirectory) {
+// assertEquals(dataSql, context.getString(
+// GenericJdbcConnectorConstants.CONNECTOR_JDBC_DATA_SQL));
+// assertEquals(inputDirectory, context.getString(
+// Constants.JOB_ETL_INPUT_DIRECTORY));
+// }
+//
+// public class DummyOptions implements Options {
+// Hashtable<String, String> store = new Hashtable<String, String>();
+//
+// public void setOption(String key, String value) {
+// store.put(key, value);
+// }
+//
+// @Override
+// public String getOption(String key) {
+// return store.get(key);
+// }
+// }
+//
+// public class DummyContext implements MutableContext {
+// Hashtable<String, String> store = new Hashtable<String, String>();
+//
+// @Override
+// public String getString(String key) {
+// return store.get(key);
+// }
+//
+// @Override
+// public void setString(String key, String value) {
+// store.put(key, value);
+// }
+// }
+
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/0976713f/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportLoader.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportLoader.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportLoader.java
new file mode 100644
index 0000000..649808d
--- /dev/null
+++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportLoader.java
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.jdbc;
+
+import java.sql.ResultSet;
+import java.util.HashMap;
+
+import junit.framework.TestCase;
+
+import org.apache.sqoop.job.etl.Loader;
+import org.apache.sqoop.job.io.DataReader;
+import org.junit.Test;
+
+public class TestExportLoader extends TestCase {
+
+ private final String tableName;
+
+ private GenericJdbcExecutor executor;
+
+ private static final int START = -50;
+ private static final int NUMBER_OF_ROWS = 101;
+
+ public TestExportLoader() {
+ tableName = getClass().getSimpleName();
+ }
+
+ public void testVoid() { }
+
+// @Override
+// public void setUp() {
+// executor = new GenericJdbcExecutor(GenericJdbcTestConstants.DRIVER,
+// GenericJdbcTestConstants.URL, null, null);
+//
+// if (!executor.existTable(tableName)) {
+// executor.executeUpdate("CREATE TABLE "
+// + executor.delimitIdentifier(tableName)
+// + "(ICOL INTEGER PRIMARY KEY, DCOL DOUBLE, VCOL VARCHAR(20))");
+// }
+// }
+//
+// @Override
+// public void tearDown() {
+// executor.close();
+// }
+//
+// @Test
+// public void testInsert() throws Exception {
+// DummyContext context = new DummyContext();
+// context.setString(
+// GenericJdbcConnectorConstants.CONNECTOR_JDBC_DRIVER,
+// GenericJdbcTestConstants.DRIVER);
+// context.setString(
+// GenericJdbcConnectorConstants.CONNECTOR_JDBC_URL,
+// GenericJdbcTestConstants.URL);
+// context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_DATA_SQL,
+// "INSERT INTO " + executor.delimitIdentifier(tableName)
+// + " VALUES (?,?,?)");
+//
+// Loader loader = new GenericJdbcExportLoader();
+// DummyReader reader = new DummyReader();
+//
+// loader.run(context, reader);
+//
+// int index = START;
+// ResultSet rs = executor.executeQuery("SELECT * FROM "
+// + executor.delimitIdentifier(tableName) + " ORDER BY ICOL");
+// while (rs.next()) {
+// assertEquals(Integer.valueOf(index), rs.getObject(1));
+// assertEquals(Double.valueOf(index), rs.getObject(2));
+// assertEquals(String.valueOf(index), rs.getObject(3));
+// index++;
+// }
+// assertEquals(NUMBER_OF_ROWS, index-START);
+// }
+//
+// public class DummyContext implements MutableContext {
+// HashMap<String, String> store = new HashMap<String, String>();
+//
+// @Override
+// public String getString(String key) {
+// return store.get(key);
+// }
+//
+// @Override
+// public void setString(String key, String value) {
+// store.put(key, value);
+// }
+// }
+//
+// public class DummyReader extends DataReader {
+// int index = 0;
+//
+// @Override
+// public void setFieldDelimiter(char fieldDelimiter) {
+// // do nothing and use default delimiter
+// }
+//
+// @Override
+// public Object[] readArrayRecord() {
+// if (index < NUMBER_OF_ROWS) {
+// Object[] array = new Object[] {
+// new Integer(START+index),
+// new Double(START+index),
+// String.valueOf(START+index) };
+// index++;
+// return array;
+// } else {
+// return null;
+// }
+// }
+//
+// @Override
+// public String readCsvRecord() {
+// fail("This method should not be invoked.");
+// return null;
+// }
+//
+// @Override
+// public Object readContent(int type) {
+// fail("This method should not be invoked.");
+// return null;
+// }
+// }
+
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/0976713f/spi/src/main/java/org/apache/sqoop/job/Constants.java
----------------------------------------------------------------------
diff --git a/spi/src/main/java/org/apache/sqoop/job/Constants.java b/spi/src/main/java/org/apache/sqoop/job/Constants.java
index 927950d..90935cf 100644
--- a/spi/src/main/java/org/apache/sqoop/job/Constants.java
+++ b/spi/src/main/java/org/apache/sqoop/job/Constants.java
@@ -34,6 +34,9 @@ public class Constants {
public static final String JOB_ETL_OUTPUT_DIRECTORY = PREFIX_CONFIG
+ "etl.output.directory";
+ public static final String JOB_ETL_INPUT_DIRECTORY = PREFIX_CONFIG
+ + "etl.input.directory";
+
protected Constants() {
// Disable explicit object creation
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/0976713f/spi/src/main/java/org/apache/sqoop/job/etl/Destroyer.java
----------------------------------------------------------------------
diff --git a/spi/src/main/java/org/apache/sqoop/job/etl/Destroyer.java b/spi/src/main/java/org/apache/sqoop/job/etl/Destroyer.java
index 37b9f1b..c8dc7c3 100644
--- a/spi/src/main/java/org/apache/sqoop/job/etl/Destroyer.java
+++ b/spi/src/main/java/org/apache/sqoop/job/etl/Destroyer.java
@@ -17,7 +17,7 @@
*/
package org.apache.sqoop.job.etl;
-import org.apache.sqoop.common.MapContext;
+import org.apache.sqoop.common.ImmutableContext;
/**
* This allows connector to define work to complete execution, for example,
@@ -25,7 +25,6 @@ import org.apache.sqoop.common.MapContext;
*/
public abstract class Destroyer {
- // TODO(Jarcec): This should be called with ImmutableContext
- public abstract void run(MapContext context);
+ public abstract void run(ImmutableContext context);
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/0976713f/spi/src/main/java/org/apache/sqoop/job/etl/Initializer.java
----------------------------------------------------------------------
diff --git a/spi/src/main/java/org/apache/sqoop/job/etl/Initializer.java b/spi/src/main/java/org/apache/sqoop/job/etl/Initializer.java
index 2092815..685378f 100644
--- a/spi/src/main/java/org/apache/sqoop/job/etl/Initializer.java
+++ b/spi/src/main/java/org/apache/sqoop/job/etl/Initializer.java
@@ -17,8 +17,8 @@
*/
package org.apache.sqoop.job.etl;
-import org.apache.sqoop.common.MapContext;
-import org.apache.sqoop.common.MutableMapContext;
+import org.apache.sqoop.common.ImmutableContext;
+import org.apache.sqoop.common.MutableContext;
import java.util.LinkedList;
import java.util.List;
@@ -38,7 +38,7 @@ public abstract class Initializer {
* @param connectionConfiguration Connector's connection configuration object
* @param jobConfiguration Connector's job configuration object
*/
- public abstract void initialize(MutableMapContext context,
+ public abstract void initialize(MutableContext context,
Object connectionConfiguration,
Object jobConfiguration);
@@ -49,7 +49,7 @@ public abstract class Initializer {
*
* @return
*/
- public List<String> getJars(MapContext context,
+ public List<String> getJars(ImmutableContext context,
Object connectionConfiguration,
Object jobConfiguration) {
return new LinkedList<String>();