You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ab...@apache.org on 2014/08/12 00:01:18 UTC
[15/19] SQOOP-1376: Sqoop2: From/To: Refactor connector interface
http://git-wip-us.apache.org/repos/asf/sqoop/blob/0f90d7d3/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcToInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcToInitializer.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcToInitializer.java
new file mode 100644
index 0000000..816821e
--- /dev/null
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcToInitializer.java
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.jdbc;
+
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.log4j.Logger;
+import org.apache.sqoop.common.MutableContext;
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration;
+import org.apache.sqoop.connector.jdbc.configuration.ToJobConfiguration;
+import org.apache.sqoop.connector.jdbc.util.SqlTypesUtils;
+import org.apache.sqoop.job.etl.Initializer;
+import org.apache.sqoop.job.etl.InitializerContext;
+import org.apache.sqoop.schema.Schema;
+import org.apache.sqoop.schema.type.Column;
+import org.apache.sqoop.utils.ClassUtils;
+
+public class GenericJdbcToInitializer extends Initializer<ConnectionConfiguration, ToJobConfiguration> {
+
+ private GenericJdbcExecutor executor;
+ private static final Logger LOG =
+ Logger.getLogger(GenericJdbcToInitializer.class);
+
+ @Override
+ public void initialize(InitializerContext context, ConnectionConfiguration connection, ToJobConfiguration job) {
+ configureJdbcProperties(context.getContext(), connection, job);
+ try {
+ configureTableProperties(context.getContext(), connection, job);
+ } finally {
+ executor.close();
+ }
+ }
+
+ @Override
+ public List<String> getJars(InitializerContext context, ConnectionConfiguration connection, ToJobConfiguration job) {
+ List<String> jars = new LinkedList<String>();
+
+ jars.add(ClassUtils.jarForClass(connection.connection.jdbcDriver));
+
+ return jars;
+ }
+
+ @Override
+ public Schema getSchema(InitializerContext context, ConnectionConfiguration connectionConfiguration, ToJobConfiguration toJobConfiguration) {
+ configureJdbcProperties(context.getContext(), connectionConfiguration, toJobConfiguration);
+
+ String schemaName = toJobConfiguration.table.tableName;
+
+ if (schemaName == null) {
+ throw new SqoopException(GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0019,
+ "Table name extraction not supported yet.");
+ }
+
+ if(toJobConfiguration.table.schemaName != null) {
+ schemaName = toJobConfiguration.table.schemaName + "." + schemaName;
+ }
+
+ Schema schema = new Schema(schemaName);
+ ResultSet rs = null;
+ ResultSetMetaData rsmt = null;
+ try {
+ rs = executor.executeQuery("SELECT * FROM " + schemaName + " WHERE 1 = 0");
+
+ rsmt = rs.getMetaData();
+ for (int i = 1 ; i <= rsmt.getColumnCount(); i++) {
+ Column column = SqlTypesUtils.sqlTypeToAbstractType(rsmt.getColumnType(i));
+
+ String columnName = rsmt.getColumnName(i);
+ if (columnName == null || columnName.equals("")) {
+ columnName = rsmt.getColumnLabel(i);
+ if (null == columnName) {
+ columnName = "Column " + i;
+ }
+ }
+
+ column.setName(columnName);
+ schema.addColumn(column);
+ }
+
+ return schema;
+ } catch (SQLException e) {
+ throw new SqoopException(GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0016, e);
+ } finally {
+ if(rs != null) {
+ try {
+ rs.close();
+ } catch (SQLException e) {
+ LOG.info("Ignoring exception while closing ResultSet", e);
+ }
+ }
+ }
+ }
+
+ private void configureJdbcProperties(MutableContext context, ConnectionConfiguration connectionConfig, ToJobConfiguration jobConfig) {
+ String driver = connectionConfig.connection.jdbcDriver;
+ String url = connectionConfig.connection.connectionString;
+ String username = connectionConfig.connection.username;
+ String password = connectionConfig.connection.password;
+
+ assert driver != null;
+ assert url != null;
+
+ executor = new GenericJdbcExecutor(driver, url, username, password);
+ }
+
+ private void configureTableProperties(MutableContext context, ConnectionConfiguration connectionConfig, ToJobConfiguration jobConfig) {
+ String dataSql;
+
+ String schemaName = jobConfig.table.schemaName;
+ String tableName = jobConfig.table.tableName;
+ String stageTableName = jobConfig.table.stageTableName;
+ boolean clearStageTable = jobConfig.table.clearStageTable == null ?
+ false : jobConfig.table.clearStageTable;
+ final boolean stageEnabled =
+ stageTableName != null && stageTableName.length() > 0;
+ String tableSql = jobConfig.table.sql;
+ String tableColumns = jobConfig.table.columns;
+
+ if (tableName != null && tableSql != null) {
+ // when both table name and table sql are specified:
+ throw new SqoopException(
+ GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0007);
+
+ } else if (tableName != null) {
+ // when table name is specified:
+ if(stageEnabled) {
+ LOG.info("Stage has been enabled.");
+ LOG.info("Use stageTable: " + stageTableName +
+ " with clearStageTable: " + clearStageTable);
+
+ if(clearStageTable) {
+ executor.deleteTableData(stageTableName);
+ } else {
+ long stageRowCount = executor.getTableRowCount(stageTableName);
+ if(stageRowCount > 0) {
+ throw new SqoopException(
+ GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0017);
+ }
+ }
+ }
+
+ // For databases that support schemas (IE: postgresql).
+ final String tableInUse = stageEnabled ? stageTableName : tableName;
+ String fullTableName = (schemaName == null) ?
+ executor.delimitIdentifier(tableInUse) :
+ executor.delimitIdentifier(schemaName) +
+ "." + executor.delimitIdentifier(tableInUse);
+
+ if (tableColumns == null) {
+ String[] columns = executor.getQueryColumns("SELECT * FROM "
+ + fullTableName + " WHERE 1 = 0");
+ StringBuilder builder = new StringBuilder();
+ builder.append("INSERT INTO ");
+ builder.append(fullTableName);
+ builder.append(" VALUES (?");
+ for (int i = 1; i < columns.length; i++) {
+ builder.append(",?");
+ }
+ builder.append(")");
+ dataSql = builder.toString();
+
+ } else {
+ String[] columns = StringUtils.split(tableColumns, ',');
+ StringBuilder builder = new StringBuilder();
+ builder.append("INSERT INTO ");
+ builder.append(fullTableName);
+ builder.append(" (");
+ builder.append(tableColumns);
+ builder.append(") VALUES (?");
+ for (int i = 1; i < columns.length; i++) {
+ builder.append(",?");
+ }
+ builder.append(")");
+ dataSql = builder.toString();
+ }
+ } else if (tableSql != null) {
+ // when table sql is specified:
+
+ if (tableSql.indexOf(
+ GenericJdbcConnectorConstants.SQL_PARAMETER_MARKER) == -1) {
+ // make sure parameter marker is in the specified sql
+ throw new SqoopException(
+ GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0013);
+ }
+
+ if (tableColumns == null) {
+ dataSql = tableSql;
+ } else {
+ throw new SqoopException(
+ GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0014);
+ }
+ } else {
+ // when neither are specified:
+ throw new SqoopException(
+ GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0008);
+ }
+
+ context.setString(GenericJdbcConnectorConstants.CONNECTOR_TO_JDBC_DATA_SQL,
+ dataSql);
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/0f90d7d3/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcValidator.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcValidator.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcValidator.java
index 0c5f6e1..92f70e2 100644
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcValidator.java
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcValidator.java
@@ -18,9 +18,8 @@
package org.apache.sqoop.connector.jdbc;
import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration;
-import org.apache.sqoop.connector.jdbc.configuration.ExportJobConfiguration;
-import org.apache.sqoop.connector.jdbc.configuration.ImportJobConfiguration;
-import org.apache.sqoop.model.MJob;
+import org.apache.sqoop.connector.jdbc.configuration.FromJobConfiguration;
+import org.apache.sqoop.connector.jdbc.configuration.ToJobConfiguration;
import org.apache.sqoop.validation.Status;
import org.apache.sqoop.validation.Validation;
import org.apache.sqoop.validation.Validator;
@@ -67,20 +66,13 @@ public class GenericJdbcValidator extends Validator {
}
@Override
- public Validation validateJob(MJob.Type type, Object jobConfiguration) {
- switch(type) {
- case IMPORT:
- return validateImportJob(jobConfiguration);
- case EXPORT:
- return validateExportJob(jobConfiguration);
- default:
- return super.validateJob(type, jobConfiguration);
- }
+ public Validation validateJob(Object jobConfiguration) {
+ return super.validateJob(jobConfiguration);
}
private Validation validateExportJob(Object jobConfiguration) {
- Validation validation = new Validation(ExportJobConfiguration.class);
- ExportJobConfiguration configuration = (ExportJobConfiguration)jobConfiguration;
+ Validation validation = new Validation(ToJobConfiguration.class);
+ ToJobConfiguration configuration = (ToJobConfiguration)jobConfiguration;
if(configuration.table.tableName == null && configuration.table.sql == null) {
validation.addMessage(Status.UNACCEPTABLE, "table", "Either table name or SQL must be specified");
@@ -104,8 +96,8 @@ public class GenericJdbcValidator extends Validator {
}
private Validation validateImportJob(Object jobConfiguration) {
- Validation validation = new Validation(ImportJobConfiguration.class);
- ImportJobConfiguration configuration = (ImportJobConfiguration)jobConfiguration;
+ Validation validation = new Validation(FromJobConfiguration.class);
+ FromJobConfiguration configuration = (FromJobConfiguration)jobConfiguration;
if(configuration.table.tableName == null && configuration.table.sql == null) {
validation.addMessage(Status.UNACCEPTABLE, "table", "Either table name or SQL must be specified");
http://git-wip-us.apache.org/repos/asf/sqoop/blob/0f90d7d3/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/ExportJobConfiguration.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/ExportJobConfiguration.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/ExportJobConfiguration.java
deleted file mode 100644
index f2b2d65..0000000
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/ExportJobConfiguration.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.connector.jdbc.configuration;
-
-import org.apache.sqoop.model.ConfigurationClass;
-import org.apache.sqoop.model.Form;
-
-/**
- *
- */
-@ConfigurationClass
-public class ExportJobConfiguration {
- @Form public ExportTableForm table;
-
- public ExportJobConfiguration() {
- table = new ExportTableForm();
- }
-}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/0f90d7d3/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/ExportTableForm.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/ExportTableForm.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/ExportTableForm.java
deleted file mode 100644
index 14a7033..0000000
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/ExportTableForm.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.connector.jdbc.configuration;
-
-import org.apache.sqoop.model.FormClass;
-import org.apache.sqoop.model.Input;
-
-/**
- *
- */
-@FormClass
-public class ExportTableForm {
- @Input(size = 50) public String schemaName;
- @Input(size = 2000) public String tableName;
- @Input(size = 50) public String sql;
- @Input(size = 50) public String columns;
- @Input(size = 2000) public String stageTableName;
- @Input public Boolean clearStageTable;
-}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/0f90d7d3/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/FromJobConfiguration.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/FromJobConfiguration.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/FromJobConfiguration.java
new file mode 100644
index 0000000..bd1c4be
--- /dev/null
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/FromJobConfiguration.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.jdbc.configuration;
+
+import org.apache.sqoop.model.ConfigurationClass;
+import org.apache.sqoop.model.Form;
+
+/**
+ *
+ */
+@ConfigurationClass
+public class FromJobConfiguration {
+ @Form public FromTableForm table;
+
+ public FromJobConfiguration() {
+ table = new FromTableForm();
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/0f90d7d3/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/FromTableForm.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/FromTableForm.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/FromTableForm.java
new file mode 100644
index 0000000..8f6fb60
--- /dev/null
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/FromTableForm.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.jdbc.configuration;
+
+import org.apache.sqoop.model.FormClass;
+import org.apache.sqoop.model.Input;
+
+/**
+ *
+ */
+@FormClass
+public class FromTableForm {
+ @Input(size = 50) public String schemaName;
+ @Input(size = 50) public String tableName;
+ @Input(size = 2000) public String sql;
+ @Input(size = 50) public String columns;
+ @Input(size = 50) public String partitionColumn;
+ @Input public Boolean partitionColumnNull;
+ @Input(size = 50) public String boundaryQuery;
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/0f90d7d3/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/ImportJobConfiguration.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/ImportJobConfiguration.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/ImportJobConfiguration.java
deleted file mode 100644
index f3c1d13..0000000
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/ImportJobConfiguration.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.connector.jdbc.configuration;
-
-import org.apache.sqoop.model.ConfigurationClass;
-import org.apache.sqoop.model.Form;
-
-/**
- *
- */
-@ConfigurationClass
-public class ImportJobConfiguration {
- @Form public ImportTableForm table;
-
- public ImportJobConfiguration() {
- table = new ImportTableForm();
- }
-}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/0f90d7d3/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/ImportTableForm.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/ImportTableForm.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/ImportTableForm.java
deleted file mode 100644
index 0991b28..0000000
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/ImportTableForm.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.connector.jdbc.configuration;
-
-import org.apache.sqoop.model.FormClass;
-import org.apache.sqoop.model.Input;
-
-/**
- *
- */
-@FormClass
-public class ImportTableForm {
- @Input(size = 50) public String schemaName;
- @Input(size = 50) public String tableName;
- @Input(size = 2000) public String sql;
- @Input(size = 50) public String columns;
- @Input(size = 50) public String partitionColumn;
- @Input public Boolean partitionColumnNull;
- @Input(size = 50) public String boundaryQuery;
-}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/0f90d7d3/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/ToJobConfiguration.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/ToJobConfiguration.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/ToJobConfiguration.java
new file mode 100644
index 0000000..a0f837e
--- /dev/null
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/ToJobConfiguration.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.jdbc.configuration;
+
+import org.apache.sqoop.model.ConfigurationClass;
+import org.apache.sqoop.model.Form;
+
+/**
+ *
+ */
+@ConfigurationClass
+public class ToJobConfiguration {
+ @Form public ToTableForm table;
+
+ public ToJobConfiguration() {
+ table = new ToTableForm();
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/0f90d7d3/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/ToTableForm.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/ToTableForm.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/ToTableForm.java
new file mode 100644
index 0000000..dca0bf9
--- /dev/null
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/ToTableForm.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.jdbc.configuration;
+
+import org.apache.sqoop.model.FormClass;
+import org.apache.sqoop.model.Input;
+
+/**
+ *
+ */
+@FormClass
+public class ToTableForm {
+ @Input(size = 50) public String schemaName;
+ @Input(size = 2000) public String tableName;
+ @Input(size = 50) public String sql;
+ @Input(size = 50) public String columns;
+ @Input(size = 2000) public String stageTableName;
+ @Input public Boolean clearStageTable;
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/0f90d7d3/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportInitializer.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportInitializer.java
index 3c5ca39..73106ab 100644
--- a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportInitializer.java
+++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportInitializer.java
@@ -22,7 +22,7 @@ import org.apache.sqoop.common.MutableContext;
import org.apache.sqoop.common.MutableMapContext;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration;
-import org.apache.sqoop.connector.jdbc.configuration.ExportJobConfiguration;
+//import org.apache.sqoop.connector.jdbc.configuration.ExportJobConfiguration;
import org.apache.sqoop.job.etl.Initializer;
import org.apache.sqoop.job.etl.InitializerContext;
import org.apache.sqoop.model.MJob;
http://git-wip-us.apache.org/repos/asf/sqoop/blob/0f90d7d3/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportLoader.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportLoader.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportLoader.java
index 5b7a1e3..420e3ad 100644
--- a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportLoader.java
+++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportLoader.java
@@ -27,7 +27,7 @@ import java.util.Collection;
import org.apache.sqoop.common.MutableContext;
import org.apache.sqoop.common.MutableMapContext;
import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration;
-import org.apache.sqoop.connector.jdbc.configuration.ExportJobConfiguration;
+//import org.apache.sqoop.connector.jdbc.configuration.ExportJobConfiguration;
import org.apache.sqoop.etl.io.DataReader;
import org.apache.sqoop.job.etl.Loader;
import org.apache.sqoop.job.etl.LoaderContext;
http://git-wip-us.apache.org/repos/asf/sqoop/blob/0f90d7d3/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportExtractor.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportExtractor.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportExtractor.java
index 9130375..8ded5a4 100644
--- a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportExtractor.java
+++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportExtractor.java
@@ -22,7 +22,7 @@ import junit.framework.TestCase;
import org.apache.sqoop.common.MutableContext;
import org.apache.sqoop.common.MutableMapContext;
import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration;
-import org.apache.sqoop.connector.jdbc.configuration.ImportJobConfiguration;
+//import org.apache.sqoop.connector.jdbc.configuration.ImportJobConfiguration;
import org.apache.sqoop.job.etl.Extractor;
import org.apache.sqoop.job.etl.ExtractorContext;
import org.apache.sqoop.etl.io.DataWriter;
http://git-wip-us.apache.org/repos/asf/sqoop/blob/0f90d7d3/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportInitializer.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportInitializer.java
index 15c38aa..c5eb852 100644
--- a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportInitializer.java
+++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportInitializer.java
@@ -24,7 +24,7 @@ import junit.framework.TestCase;
import org.apache.sqoop.common.MutableContext;
import org.apache.sqoop.common.MutableMapContext;
import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration;
-import org.apache.sqoop.connector.jdbc.configuration.ImportJobConfiguration;
+//import org.apache.sqoop.connector.jdbc.configuration.ImportJobConfiguration;
import org.apache.sqoop.job.Constants;
import org.apache.sqoop.job.etl.Initializer;
import org.apache.sqoop.job.etl.InitializerContext;
http://git-wip-us.apache.org/repos/asf/sqoop/blob/0f90d7d3/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportPartitioner.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportPartitioner.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportPartitioner.java
index 5b574c8..b48931c 100644
--- a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportPartitioner.java
+++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportPartitioner.java
@@ -31,7 +31,7 @@ import org.apache.sqoop.common.MutableContext;
import org.apache.sqoop.common.MutableMapContext;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration;
-import org.apache.sqoop.connector.jdbc.configuration.ImportJobConfiguration;
+//import org.apache.sqoop.connector.jdbc.configuration.ImportJobConfiguration;
import org.apache.sqoop.job.Constants;
import org.apache.sqoop.job.etl.Partition;
import org.apache.sqoop.job.etl.Partitioner;
http://git-wip-us.apache.org/repos/asf/sqoop/blob/0f90d7d3/connector/connector-mysql-jdbc/src/main/java/org/apache/sqoop/connector/mysqljdbc/MySqlJdbcConnector.java
----------------------------------------------------------------------
diff --git a/connector/connector-mysql-jdbc/src/main/java/org/apache/sqoop/connector/mysqljdbc/MySqlJdbcConnector.java b/connector/connector-mysql-jdbc/src/main/java/org/apache/sqoop/connector/mysqljdbc/MySqlJdbcConnector.java
index 17215f0..346b625 100644
--- a/connector/connector-mysql-jdbc/src/main/java/org/apache/sqoop/connector/mysqljdbc/MySqlJdbcConnector.java
+++ b/connector/connector-mysql-jdbc/src/main/java/org/apache/sqoop/connector/mysqljdbc/MySqlJdbcConnector.java
@@ -23,8 +23,8 @@ import java.util.List;
import java.util.Locale;
import java.util.ResourceBundle;
-import org.apache.sqoop.job.etl.Exporter;
-import org.apache.sqoop.job.etl.Importer;
+import org.apache.sqoop.job.etl.From;
+import org.apache.sqoop.job.etl.To;
import org.apache.sqoop.model.MConnectionForms;
import org.apache.sqoop.model.MForm;
import org.apache.sqoop.connector.spi.SqoopConnector;
@@ -53,13 +53,13 @@ public class MySqlJdbcConnector implements SqoopConnector {
}
@Override
- public Importer getImporter() {
+ public From getImporter() {
// TODO Auto-generated method stub
return null;
}
@Override
- public Exporter getExporter() {
+ public To getExporter() {
// TODO Auto-generated method stub
return null;
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/0f90d7d3/core/src/main/java/org/apache/sqoop/connector/ConnectorHandler.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/connector/ConnectorHandler.java b/core/src/main/java/org/apache/sqoop/connector/ConnectorHandler.java
index b80de7f..ca4b253 100644
--- a/core/src/main/java/org/apache/sqoop/connector/ConnectorHandler.java
+++ b/core/src/main/java/org/apache/sqoop/connector/ConnectorHandler.java
@@ -19,18 +19,16 @@ package org.apache.sqoop.connector;
import java.io.IOException;
import java.net.URL;
-import java.util.LinkedList;
-import java.util.List;
import java.util.Properties;
import org.apache.log4j.Logger;
+import org.apache.sqoop.common.ConnectorType;
import org.apache.sqoop.core.ConfigurationConstants;
import org.apache.sqoop.model.FormUtils;
import org.apache.sqoop.model.MConnectionForms;
import org.apache.sqoop.model.MConnector;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.connector.spi.SqoopConnector;
-import org.apache.sqoop.model.MJob;
import org.apache.sqoop.model.MJobForms;
public final class ConnectorHandler {
@@ -93,21 +91,19 @@ public final class ConnectorHandler {
}
// Initialize Metadata
- List<MJobForms> jobForms = new LinkedList<MJobForms>();
- for(MJob.Type type : MJob.Type.values()) {
- Class klass = connector.getJobConfigurationClass(type);
- if(klass != null) {
- jobForms.add(new MJobForms(type, FormUtils.toForms(klass)));
- }
- }
-
+ MJobForms fromJobForms = new MJobForms(FormUtils.toForms(
+ connector.getJobConfigurationClass(ConnectorType.FROM)));
MConnectionForms connectionForms = new MConnectionForms(
FormUtils.toForms(connector.getConnectionConfigurationClass()));
+ MJobForms toJobForms = new MJobForms(FormUtils.toForms(
+ connector.getJobConfigurationClass(ConnectorType.TO)));
+ MConnectionForms toConnectionForms = new MConnectionForms(
+ FormUtils.toForms(connector.getConnectionConfigurationClass()));
String connectorVersion = connector.getVersion();
- mConnector = new MConnector(connectorUniqueName, connectorClassName,
- connectorVersion, connectionForms, jobForms);
+ mConnector = new MConnector(connectorUniqueName, connectorClassName, connectorVersion,
+ connectionForms, fromJobForms, toJobForms);
if (LOG.isInfoEnabled()) {
LOG.info("Connector [" + connectorClassName + "] initialized.");
http://git-wip-us.apache.org/repos/asf/sqoop/blob/0f90d7d3/core/src/main/java/org/apache/sqoop/framework/ExecutionEngine.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/ExecutionEngine.java b/core/src/main/java/org/apache/sqoop/framework/ExecutionEngine.java
index f43942d..96ec148 100644
--- a/core/src/main/java/org/apache/sqoop/framework/ExecutionEngine.java
+++ b/core/src/main/java/org/apache/sqoop/framework/ExecutionEngine.java
@@ -52,15 +52,9 @@ public abstract class ExecutionEngine {
}
/**
- * Prepare given submission request for import job type.
+ * Prepare given submission request.
*
* @param request Submission request
*/
- public abstract void prepareImportSubmission(SubmissionRequest request);
-
- /**
- * Prepare given submission request for export job type..
- * @param request
- */
- public abstract void prepareExportSubmission(SubmissionRequest request);
+ public abstract void prepareSubmission(SubmissionRequest request);
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/0f90d7d3/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java b/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java
index 505121c..81e1147 100644
--- a/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java
+++ b/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java
@@ -24,14 +24,11 @@ import org.apache.sqoop.core.Reconfigurable;
import org.apache.sqoop.core.SqoopConfiguration;
import org.apache.sqoop.core.SqoopConfiguration.CoreConfigurationListener;
import org.apache.sqoop.framework.configuration.ConnectionConfiguration;
-import org.apache.sqoop.framework.configuration.ExportJobConfiguration;
-import org.apache.sqoop.framework.configuration.ImportJobConfiguration;
+import org.apache.sqoop.framework.configuration.JobConfiguration;
import org.apache.sqoop.model.*;
import org.apache.sqoop.repository.RepositoryManager;
import org.apache.sqoop.validation.Validator;
-import java.util.LinkedList;
-import java.util.List;
import java.util.Locale;
import java.util.ResourceBundle;
@@ -113,31 +110,20 @@ public class FrameworkManager implements Reconfigurable {
public static final String CURRENT_FRAMEWORK_VERSION = "1";
- public Class getJobConfigurationClass(MJob.Type jobType) {
- switch (jobType) {
- case IMPORT:
- return ImportJobConfiguration.class;
- case EXPORT:
- return ExportJobConfiguration.class;
- default:
- return null;
- }
+ public Class getJobConfigurationClass() {
+ return JobConfiguration.class;
+ }
+
+ public Class getConnectionConfigurationClass() {
+ return ConnectionConfiguration.class;
}
- public Class getConnectionConfigurationClass() {
- return ConnectionConfiguration.class;
- }
public FrameworkManager() {
MConnectionForms connectionForms = new MConnectionForms(
FormUtils.toForms(getConnectionConfigurationClass())
);
- List<MJobForms> jobForms = new LinkedList<MJobForms>();
- jobForms.add(new MJobForms(MJob.Type.IMPORT,
- FormUtils.toForms(getJobConfigurationClass(MJob.Type.IMPORT))));
- jobForms.add(new MJobForms(MJob.Type.EXPORT,
- FormUtils.toForms(getJobConfigurationClass(MJob.Type.EXPORT))));
- mFramework = new MFramework(connectionForms, jobForms,
- CURRENT_FRAMEWORK_VERSION);
+ mFramework = new MFramework(connectionForms, new MJobForms(FormUtils.toForms(getJobConfigurationClass())),
+ CURRENT_FRAMEWORK_VERSION);
// Build validator
validator = new FrameworkValidator();
http://git-wip-us.apache.org/repos/asf/sqoop/blob/0f90d7d3/core/src/main/java/org/apache/sqoop/framework/FrameworkValidator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/FrameworkValidator.java b/core/src/main/java/org/apache/sqoop/framework/FrameworkValidator.java
index f5f6a36..f19a23e 100644
--- a/core/src/main/java/org/apache/sqoop/framework/FrameworkValidator.java
+++ b/core/src/main/java/org/apache/sqoop/framework/FrameworkValidator.java
@@ -18,13 +18,11 @@
package org.apache.sqoop.framework;
import org.apache.sqoop.framework.configuration.ConnectionConfiguration;
-import org.apache.sqoop.framework.configuration.ExportJobConfiguration;
-import org.apache.sqoop.framework.configuration.ImportJobConfiguration;
import org.apache.sqoop.framework.configuration.InputForm;
+import org.apache.sqoop.framework.configuration.JobConfiguration;
import org.apache.sqoop.framework.configuration.OutputCompression;
import org.apache.sqoop.framework.configuration.OutputForm;
import org.apache.sqoop.framework.configuration.ThrottlingForm;
-import org.apache.sqoop.model.MJob;
import org.apache.sqoop.validation.Status;
import org.apache.sqoop.validation.Validation;
import org.apache.sqoop.validation.Validator;
@@ -43,61 +41,57 @@ public class FrameworkValidator extends Validator {
@Override
- public Validation validateJob(MJob.Type type, Object jobConfiguration) {
- switch(type) {
- case IMPORT:
- return validateImportJob(jobConfiguration);
- case EXPORT:
- return validateExportJob(jobConfiguration);
- default:
- return super.validateJob(type, jobConfiguration);
- }
- }
-
- private Validation validateExportJob(Object jobConfiguration) {
- Validation validation = new Validation(ExportJobConfiguration.class);
- ExportJobConfiguration configuration = (ExportJobConfiguration)jobConfiguration;
-
- validateInputForm(validation, configuration.input);
- validateThrottingForm(validation, configuration.throttling);
-
- return validation;
- }
-
- private Validation validateImportJob(Object jobConfiguration) {
- Validation validation = new Validation(ImportJobConfiguration.class);
- ImportJobConfiguration configuration = (ImportJobConfiguration)jobConfiguration;
-
- validateOutputForm(validation, configuration.output);
+ public Validation validateJob(Object jobConfiguration) {
+ JobConfiguration configuration = (JobConfiguration)jobConfiguration;
+ Validation validation = new Validation(JobConfiguration.class);
validateThrottingForm(validation, configuration.throttling);
-
- return validation;
+ return super.validateJob(jobConfiguration);
}
- private void validateInputForm(Validation validation, InputForm input) {
- if(input.inputDirectory == null || input.inputDirectory.isEmpty()) {
- validation.addMessage(Status.UNACCEPTABLE, "input", "inputDirectory", "Input directory is empty");
- }
- }
+// private Validation validateExportJob(Object jobConfiguration) {
+// Validation validation = new Validation(ExportJobConfiguration.class);
+// ExportJobConfiguration configuration = (ExportJobConfiguration)jobConfiguration;
+//
+// validateInputForm(validation, configuration.input);
+// validateThrottingForm(validation, configuration.throttling);
+//
+// return validation;
+// }
+//
+// private Validation validateImportJob(Object jobConfiguration) {
+// Validation validation = new Validation(ImportJobConfiguration.class);
+// ImportJobConfiguration configuration = (ImportJobConfiguration)jobConfiguration;
+//
+// validateOutputForm(validation, configuration.output);
+// validateThrottingForm(validation, configuration.throttling);
+//
+// return validation;
+// }
- private void validateOutputForm(Validation validation, OutputForm output) {
- if(output.outputDirectory == null || output.outputDirectory.isEmpty()) {
- validation.addMessage(Status.UNACCEPTABLE, "output", "outputDirectory", "Output directory is empty");
- }
- if(output.customCompression != null &&
- output.customCompression.trim().length() > 0 &&
- output.compression != OutputCompression.CUSTOM) {
- validation.addMessage(Status.UNACCEPTABLE, "output", "compression",
- "custom compression should be blank as " + output.compression + " is being used.");
- }
- if(output.compression == OutputCompression.CUSTOM &&
- (output.customCompression == null ||
- output.customCompression.trim().length() == 0)
- ) {
- validation.addMessage(Status.UNACCEPTABLE, "output", "compression",
- "custom compression is blank.");
- }
- }
+// private void validateInputForm(Validation validation, InputForm input) {
+// if(input.inputDirectory == null || input.inputDirectory.isEmpty()) {
+// validation.addMessage(Status.UNACCEPTABLE, "input", "inputDirectory", "Input directory is empty");
+// }
+// }
+//
+// private void validateOutputForm(Validation validation, OutputForm output) {
+// if(output.outputDirectory == null || output.outputDirectory.isEmpty()) {
+// validation.addMessage(Status.UNACCEPTABLE, "output", "outputDirectory", "Output directory is empty");
+// }
+// if(output.customCompression != null &&
+// output.customCompression.trim().length() > 0 &&
+// output.compression != OutputCompression.CUSTOM) {
+// validation.addMessage(Status.UNACCEPTABLE, "output", "compression",
+// "custom compression should be blank as " + output.compression + " is being used.");
+// }
+// if(output.compression == OutputCompression.CUSTOM &&
+// (output.customCompression == null ||
+// output.customCompression.trim().length() == 0)
+// ) {
+// validation.addMessage(Status.UNACCEPTABLE, "output", "compression",
+// "custom compression is blank.");
+// }
+// }
private void validateThrottingForm(Validation validation, ThrottlingForm throttling) {
if(throttling.extractors != null && throttling.extractors < 1) {
http://git-wip-us.apache.org/repos/asf/sqoop/blob/0f90d7d3/core/src/main/java/org/apache/sqoop/framework/JobManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/JobManager.java b/core/src/main/java/org/apache/sqoop/framework/JobManager.java
index 1700432..e0bf011 100644
--- a/core/src/main/java/org/apache/sqoop/framework/JobManager.java
+++ b/core/src/main/java/org/apache/sqoop/framework/JobManager.java
@@ -18,17 +18,17 @@
package org.apache.sqoop.framework;
import org.apache.log4j.Logger;
+import org.apache.sqoop.common.ConnectorType;
import org.apache.sqoop.common.MapContext;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.connector.ConnectorManager;
+import org.apache.sqoop.framework.configuration.JobConfiguration;
import org.apache.sqoop.request.HttpEventContext;
import org.apache.sqoop.connector.idf.IntermediateDataFormat;
import org.apache.sqoop.connector.spi.SqoopConnector;
import org.apache.sqoop.core.Reconfigurable;
import org.apache.sqoop.core.SqoopConfiguration;
import org.apache.sqoop.core.SqoopConfiguration.CoreConfigurationListener;
-import org.apache.sqoop.framework.configuration.ExportJobConfiguration;
-import org.apache.sqoop.framework.configuration.ImportJobConfiguration;
import org.apache.sqoop.job.etl.*;
import org.apache.sqoop.model.FormUtils;
import org.apache.sqoop.model.MConnection;
@@ -280,34 +280,52 @@ public class JobManager implements Reconfigurable {
"Job id: " + job.getPersistenceId());
}
- MConnection connection = repository.findConnection(job.getConnectionId());
+ MConnection fromConnection = repository.findConnection(job.getConnectionId(ConnectorType.FROM));
+ MConnection toConnection = repository.findConnection(job.getConnectionId(ConnectorType.TO));
- if (!connection.getEnabled()) {
+ if (!fromConnection.getEnabled()) {
throw new SqoopException(FrameworkError.FRAMEWORK_0010,
- "Connection id: " + connection.getPersistenceId());
+ "Connection id: " + fromConnection.getPersistenceId());
}
- SqoopConnector connector =
- ConnectorManager.getInstance().getConnector(job.getConnectorId());
+ SqoopConnector fromConnector =
+ ConnectorManager.getInstance().getConnector(job.getConnectorId(ConnectorType.FROM));
+ SqoopConnector toConnector =
+ ConnectorManager.getInstance().getConnector(job.getConnectorId(ConnectorType.TO));
- // Transform forms to connector specific classes
- Object connectorConnection = ClassUtils.instantiate(
- connector.getConnectionConfigurationClass());
- FormUtils.fromForms(connection.getConnectorPart().getForms(),
- connectorConnection);
+ // Transform forms to fromConnector specific classes
+ Object fromConnectorConnection = ClassUtils.instantiate(
+ fromConnector.getConnectionConfigurationClass());
+ FormUtils.fromForms(fromConnection.getConnectorPart().getForms(),
+ fromConnectorConnection);
- Object connectorJob = ClassUtils.instantiate(
- connector.getJobConfigurationClass(job.getType()));
- FormUtils.fromForms(job.getConnectorPart().getForms(), connectorJob);
+ Object fromJob = ClassUtils.instantiate(
+ fromConnector.getJobConfigurationClass(ConnectorType.FROM));
+ FormUtils.fromForms(
+ job.getConnectorPart(ConnectorType.FROM).getForms(), fromJob);
+
+ // Transform forms to toConnector specific classes
+ Object toConnectorConnection = ClassUtils.instantiate(
+ toConnector.getConnectionConfigurationClass());
+ FormUtils.fromForms(toConnection.getConnectorPart().getForms(),
+ toConnectorConnection);
+
+ Object toJob = ClassUtils.instantiate(
+ toConnector.getJobConfigurationClass(ConnectorType.TO));
+ FormUtils.fromForms(job.getConnectorPart(ConnectorType.TO).getForms(), toJob);
// Transform framework specific forms
- Object frameworkConnection = ClassUtils.instantiate(
+ Object fromFrameworkConnection = ClassUtils.instantiate(
FrameworkManager.getInstance().getConnectionConfigurationClass());
- FormUtils.fromForms(connection.getFrameworkPart().getForms(),
- frameworkConnection);
+ Object toFrameworkConnection = ClassUtils.instantiate(
+ FrameworkManager.getInstance().getConnectionConfigurationClass());
+ FormUtils.fromForms(fromConnection.getFrameworkPart().getForms(),
+ fromFrameworkConnection);
+ FormUtils.fromForms(toConnection.getFrameworkPart().getForms(),
+ toFrameworkConnection);
Object frameworkJob = ClassUtils.instantiate(
- FrameworkManager.getInstance().getJobConfigurationClass(job.getType()));
+ FrameworkManager.getInstance().getJobConfigurationClass());
FormUtils.fromForms(job.getFrameworkPart().getForms(), frameworkJob);
// Create request object
@@ -319,12 +337,16 @@ public class JobManager implements Reconfigurable {
// Save important variables to the submission request
request.setSummary(summary);
- request.setConnector(connector);
- request.setConfigConnectorConnection(connectorConnection);
- request.setConfigConnectorJob(connectorJob);
- request.setConfigFrameworkConnection(frameworkConnection);
+ request.setConnector(ConnectorType.FROM, fromConnector);
+ request.setConnector(ConnectorType.TO, toConnector);
+ request.setConnectorConnectionConfig(ConnectorType.FROM, fromConnectorConnection);
+ request.setConnectorConnectionConfig(ConnectorType.TO, toConnectorConnection);
+ request.setConnectorJobConfig(ConnectorType.FROM, fromJob);
+ request.setConnectorJobConfig(ConnectorType.TO, toJob);
+ // @TODO(Abe): Should we actually have 2 different Framework Connection config objects?
+ request.setFrameworkConnectionConfig(ConnectorType.FROM, fromFrameworkConnection);
+ request.setFrameworkConnectionConfig(ConnectorType.TO, toFrameworkConnection);
request.setConfigFrameworkJob(frameworkJob);
- request.setJobType(job.getType());
request.setJobName(job.getName());
request.setJobId(job.getPersistenceId());
request.setNotificationUrl(notificationBaseUrl + jobId);
@@ -342,8 +364,9 @@ public class JobManager implements Reconfigurable {
request.addJarForClass(SqoopConnector.class);
// Execution engine jar
request.addJarForClass(executionEngine.getClass());
- // Connector in use
- request.addJarForClass(connector.getClass());
+ // Connectors in use
+ request.addJarForClass(fromConnector.getClass());
+ request.addJarForClass(toConnector.getClass());
// Extra libraries that Sqoop code requires
request.addJarForClass(JSONValue.class);
@@ -351,67 +374,94 @@ public class JobManager implements Reconfigurable {
// The IDF is used in the ETL process.
request.addJarForClass(dataFormatClass);
- // Get connector callbacks
- switch (job.getType()) {
- case IMPORT:
- request.setConnectorCallbacks(connector.getImporter());
- break;
- case EXPORT:
- request.setConnectorCallbacks(connector.getExporter());
- break;
- default:
- throw new SqoopException(FrameworkError.FRAMEWORK_0005,
- "Unsupported job type " + job.getType().name());
- }
- LOG.debug("Using callbacks: " + request.getConnectorCallbacks());
- // Initialize submission from connector perspective
- CallbackBase baseCallbacks = request.getConnectorCallbacks();
+ // Get callbacks
+ request.setFromCallback(fromConnector.getFrom());
+ request.setToCallback(toConnector.getTo());
+ LOG.debug("Using callbacks: " + request.getFromCallback() + ", " + request.getToCallback());
+
+ // Initialize submission from fromConnector perspective
+ CallbackBase[] baseCallbacks = {
+ request.getFromCallback(),
+ request.getToCallback()
+ };
- Class<? extends Initializer> initializerClass = baseCallbacks
- .getInitializer();
- Initializer initializer = (Initializer) ClassUtils
- .instantiate(initializerClass);
+ CallbackBase baseCallback;
+ Class<? extends Initializer> initializerClass;
+ Initializer initializer;
+ InitializerContext initializerContext;
+
+ // Initialize From Connector callback.
+ baseCallback = request.getFromCallback();
+
+ initializerClass = baseCallback
+ .getInitializer();
+ initializer = (Initializer) ClassUtils
+ .instantiate(initializerClass);
if (initializer == null) {
throw new SqoopException(FrameworkError.FRAMEWORK_0006,
- "Can't create initializer instance: " + initializerClass.getName());
+ "Can't create initializer instance: " + initializerClass.getName());
}
// Initializer context
- InitializerContext initializerContext = new InitializerContext(
- request.getConnectorContext());
+ initializerContext = new InitializerContext(request.getConnectorContext(ConnectorType.FROM));
- // Initialize submission from connector perspective
+ // Initialize submission from fromConnector perspective
initializer.initialize(initializerContext,
- request.getConfigConnectorConnection(),
- request.getConfigConnectorJob());
+ request.getConnectorConnectionConfig(ConnectorType.FROM),
+ request.getConnectorJobConfig(ConnectorType.FROM));
// Add job specific jars to
request.addJars(initializer.getJars(initializerContext,
- request.getConfigConnectorConnection(),
- request.getConfigConnectorJob()));
+ request.getConnectorConnectionConfig(ConnectorType.FROM),
+ request.getConnectorJobConfig(ConnectorType.FROM)));
+ // @TODO(Abe): Alter behavior of Schema here. Need from Schema.
// Retrieve and persist the schema
request.getSummary().setConnectorSchema(initializer.getSchema(
- initializerContext,
- request.getConfigConnectorConnection(),
- request.getConfigConnectorJob()
- ));
+ initializerContext,
+ request.getConnectorConnectionConfig(ConnectorType.FROM),
+ request.getConnectorJobConfig(ConnectorType.FROM)
+ ));
- // Bootstrap job from framework perspective
- switch (job.getType()) {
- case IMPORT:
- prepareImportSubmission(request);
- break;
- case EXPORT:
- prepareExportSubmission(request);
- break;
- default:
- throw new SqoopException(FrameworkError.FRAMEWORK_0005,
- "Unsupported job type " + job.getType().name());
+ // Initialize To Connector callback.
+ baseCallback = request.getToCallback();
+
+ initializerClass = baseCallback
+ .getInitializer();
+ initializer = (Initializer) ClassUtils
+ .instantiate(initializerClass);
+
+ if (initializer == null) {
+ throw new SqoopException(FrameworkError.FRAMEWORK_0006,
+ "Can't create initializer instance: " + initializerClass.getName());
}
+ // Initializer context
+ initializerContext = new InitializerContext(request.getConnectorContext(ConnectorType.TO));
+
+ // Initialize submission from fromConnector perspective
+ initializer.initialize(initializerContext,
+ request.getConnectorConnectionConfig(ConnectorType.TO),
+ request.getConnectorJobConfig(ConnectorType.TO));
+
+ // Add job specific jars to
+ request.addJars(initializer.getJars(initializerContext,
+ request.getConnectorConnectionConfig(ConnectorType.TO),
+ request.getConnectorJobConfig(ConnectorType.TO)));
+
+ // @TODO(Abe): Alter behavior of Schema here. Need To Schema.
+ // Retrieve and persist the schema
+// request.getSummary().setConnectorSchema(initializer.getSchema(
+// initializerContext,
+// request.getConnectorConnectionConfig(ConnectorType.TO),
+// request.getConnectorJobConfig(ConnectorType.TO)
+// ));
+
+ // Bootstrap job from framework perspective
+ prepareSubmission(request);
+
// Make sure that this job id is not currently running and submit the job
// only if it's not.
synchronized (getClass()) {
@@ -421,6 +471,7 @@ public class JobManager implements Reconfigurable {
"Job with id " + jobId);
}
+ // @TODO(Abe): Call multiple destroyers.
// TODO(jarcec): We might need to catch all exceptions here to ensure
// that Destroyer will be executed in all cases.
boolean submitted = submissionEngine.submit(request);
@@ -436,12 +487,9 @@ public class JobManager implements Reconfigurable {
return summary;
}
- private void prepareImportSubmission(SubmissionRequest request) {
- ImportJobConfiguration jobConfiguration = (ImportJobConfiguration) request
- .getConfigFrameworkJob();
-
- // Initialize the map-reduce part (all sort of required classes, ...)
- request.setOutputDirectory(jobConfiguration.output.outputDirectory);
+ private void prepareSubmission(SubmissionRequest request) {
+ JobConfiguration jobConfiguration = (JobConfiguration) request
+ .getConfigFrameworkJob();
// We're directly moving configured number of extractors and loaders to
// underlying request object. In the future we might need to throttle this
@@ -450,21 +498,7 @@ public class JobManager implements Reconfigurable {
request.setLoaders(jobConfiguration.throttling.loaders);
// Delegate rest of the job to execution engine
- executionEngine.prepareImportSubmission(request);
- }
-
- private void prepareExportSubmission(SubmissionRequest request) {
- ExportJobConfiguration jobConfiguration = (ExportJobConfiguration) request
- .getConfigFrameworkJob();
-
- // We're directly moving configured number of extractors and loaders to
- // underlying request object. In the future we might need to throttle this
- // count based on other running jobs to meet our SLAs.
- request.setExtractors(jobConfiguration.throttling.extractors);
- request.setLoaders(jobConfiguration.throttling.loaders);
-
- // Delegate rest of the job to execution engine
- executionEngine.prepareExportSubmission(request);
+ executionEngine.prepareSubmission(request);
}
/**
@@ -472,23 +506,37 @@ public class JobManager implements Reconfigurable {
* remote cluster.
*/
private void destroySubmission(SubmissionRequest request) {
- CallbackBase baseCallbacks = request.getConnectorCallbacks();
+ CallbackBase fromCallback = request.getFromCallback();
+ CallbackBase toCallback = request.getToCallback();
- Class<? extends Destroyer> destroyerClass = baseCallbacks.getDestroyer();
- Destroyer destroyer = (Destroyer) ClassUtils.instantiate(destroyerClass);
+ Class<? extends Destroyer> fromDestroyerClass = fromCallback.getDestroyer();
+ Class<? extends Destroyer> toDestroyerClass = toCallback.getDestroyer();
+ Destroyer fromDestroyer = (Destroyer) ClassUtils.instantiate(fromDestroyerClass);
+ Destroyer toDestroyer = (Destroyer) ClassUtils.instantiate(toDestroyerClass);
- if (destroyer == null) {
+ if (fromDestroyer == null) {
throw new SqoopException(FrameworkError.FRAMEWORK_0006,
- "Can't create destroyer instance: " + destroyerClass.getName());
+ "Can't create toDestroyer instance: " + fromDestroyerClass.getName());
}
- DestroyerContext destroyerContext = new DestroyerContext(
- request.getConnectorContext(), false, request.getSummary()
+ if (toDestroyer == null) {
+ throw new SqoopException(FrameworkError.FRAMEWORK_0006,
+ "Can't create toDestroyer instance: " + toDestroyerClass.getName());
+ }
+
+ // @TODO(Abe): Update context to manage multiple connectors. As well as summary.
+ DestroyerContext fromDestroyerContext = new DestroyerContext(
+ request.getConnectorContext(ConnectorType.FROM), false, request.getSummary()
+ .getConnectorSchema());
+ DestroyerContext toDestroyerContext = new DestroyerContext(
+ request.getConnectorContext(ConnectorType.TO), false, request.getSummary()
.getConnectorSchema());
// Initialize submission from connector perspective
- destroyer.destroy(destroyerContext, request.getConfigConnectorConnection(),
- request.getConfigConnectorJob());
+ fromDestroyer.destroy(fromDestroyerContext, request.getConnectorConnectionConfig(ConnectorType.FROM),
+ request.getConnectorJobConfig(ConnectorType.FROM));
+ toDestroyer.destroy(toDestroyerContext, request.getConnectorConnectionConfig(ConnectorType.TO),
+ request.getConnectorJobConfig(ConnectorType.TO));
}
public MSubmission stop(long jobId, HttpEventContext ctx) {
http://git-wip-us.apache.org/repos/asf/sqoop/blob/0f90d7d3/core/src/main/java/org/apache/sqoop/framework/SubmissionRequest.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/SubmissionRequest.java b/core/src/main/java/org/apache/sqoop/framework/SubmissionRequest.java
index 7900eee..1645036 100644
--- a/core/src/main/java/org/apache/sqoop/framework/SubmissionRequest.java
+++ b/core/src/main/java/org/apache/sqoop/framework/SubmissionRequest.java
@@ -17,16 +17,18 @@
*/
package org.apache.sqoop.framework;
+import org.apache.sqoop.common.ConnectorType;
import org.apache.sqoop.common.MutableMapContext;
import org.apache.sqoop.connector.idf.IntermediateDataFormat;
import org.apache.sqoop.connector.spi.SqoopConnector;
import org.apache.sqoop.job.etl.CallbackBase;
-import org.apache.sqoop.model.MJob;
import org.apache.sqoop.model.MSubmission;
import org.apache.sqoop.utils.ClassUtils;
+import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
/**
* Submission details class is used when creating new submission and contains
@@ -51,14 +53,9 @@ public class SubmissionRequest {
long jobId;
/**
- * Job type
- */
- MJob.Type jobType;
-
- /**
* Connector instance associated with this submission request
*/
- SqoopConnector connector;
+ Map<ConnectorType, SqoopConnector > connectors;
/**
* List of required local jars for the job
@@ -66,22 +63,27 @@ public class SubmissionRequest {
List<String> jars;
/**
- * Base callbacks that are independent on job type
+ * From connector callback
+ */
+ CallbackBase fromCallback;
+
+ /**
+ * To connector callback
*/
- CallbackBase connectorCallbacks;
+ CallbackBase toCallback;
/**
- * All 4 configuration objects
+ * All configuration objects
*/
- Object configConnectorConnection;
- Object configConnectorJob;
- Object configFrameworkConnection;
+ Map<ConnectorType, Object> connectorConnectionConfigs;
+ Map<ConnectorType, Object> connectorJobConfigs;
+ Map<ConnectorType, Object> frameworkConnectionConfigs;
Object configFrameworkJob;
/**
* Connector context (submission specific configuration)
*/
- MutableMapContext connectorContext;
+ Map<ConnectorType, MutableMapContext> connectorContexts;
/**
* Framework context (submission specific configuration)
@@ -115,8 +117,17 @@ public class SubmissionRequest {
public SubmissionRequest() {
this.jars = new LinkedList<String>();
- this.connectorContext = new MutableMapContext();
+ this.connectorContexts = new HashMap<ConnectorType, MutableMapContext>();
+
+ this.connectorContexts.put(ConnectorType.FROM, new MutableMapContext());
+ this.connectorContexts.put(ConnectorType.TO, new MutableMapContext());
this.frameworkContext = new MutableMapContext();
+
+ this.connectorConnectionConfigs = new HashMap<ConnectorType, Object>();
+ this.connectorJobConfigs = new HashMap<ConnectorType, Object>();
+ this.frameworkConnectionConfigs = new HashMap<ConnectorType, Object>();
+
+ this.connectors = new HashMap<ConnectorType, SqoopConnector>();
}
public MSubmission getSummary() {
@@ -143,20 +154,12 @@ public class SubmissionRequest {
this.jobId = jobId;
}
- public MJob.Type getJobType() {
- return jobType;
- }
-
- public void setJobType(MJob.Type jobType) {
- this.jobType = jobType;
+ public SqoopConnector getConnector(ConnectorType type) {
+ return connectors.get(type);
}
- public SqoopConnector getConnector() {
- return connector;
- }
-
- public void setConnector(SqoopConnector connector) {
- this.connector = connector;
+ public void setConnector(ConnectorType type, SqoopConnector connector) {
+ this.connectors.put(type, connector);
}
public List<String> getJars() {
@@ -179,36 +182,44 @@ public class SubmissionRequest {
}
}
- public CallbackBase getConnectorCallbacks() {
- return connectorCallbacks;
+ public CallbackBase getFromCallback() {
+ return fromCallback;
+ }
+
+ public void setFromCallback(CallbackBase fromCallback) {
+ this.fromCallback = fromCallback;
+ }
+
+ public CallbackBase getToCallback() {
+ return toCallback;
}
- public void setConnectorCallbacks(CallbackBase connectorCallbacks) {
- this.connectorCallbacks = connectorCallbacks;
+ public void setToCallback(CallbackBase toCallback) {
+ this.toCallback = toCallback;
}
- public Object getConfigConnectorConnection() {
- return configConnectorConnection;
+ public Object getConnectorConnectionConfig(ConnectorType type) {
+ return connectorConnectionConfigs.get(type);
}
- public void setConfigConnectorConnection(Object config) {
- configConnectorConnection = config;
+ public void setConnectorConnectionConfig(ConnectorType type, Object config) {
+ connectorConnectionConfigs.put(type, config);
}
- public Object getConfigConnectorJob() {
- return configConnectorJob;
+ public Object getConnectorJobConfig(ConnectorType type) {
+ return connectorJobConfigs.get(type);
}
- public void setConfigConnectorJob(Object config) {
- configConnectorJob = config;
+ public void setConnectorJobConfig(ConnectorType type, Object config) {
+ connectorJobConfigs.put(type, config);
}
- public Object getConfigFrameworkConnection() {
- return configFrameworkConnection;
+ public Object getFrameworkConnectionConfig(ConnectorType type) {
+ return frameworkConnectionConfigs.get(type);
}
- public void setConfigFrameworkConnection(Object config) {
- configFrameworkConnection = config;
+ public void setFrameworkConnectionConfig(ConnectorType type, Object config) {
+ frameworkConnectionConfigs.put(type, config);
}
public Object getConfigFrameworkJob() {
@@ -219,22 +230,14 @@ public class SubmissionRequest {
configFrameworkJob = config;
}
- public MutableMapContext getConnectorContext() {
- return connectorContext;
+ public MutableMapContext getConnectorContext(ConnectorType type) {
+ return connectorContexts.get(type);
}
public MutableMapContext getFrameworkContext() {
return frameworkContext;
}
- public String getOutputDirectory() {
- return outputDirectory;
- }
-
- public void setOutputDirectory(String outputDirectory) {
- this.outputDirectory = outputDirectory;
- }
-
public String getNotificationUrl() {
return notificationUrl;
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/0f90d7d3/core/src/main/java/org/apache/sqoop/framework/configuration/JobConfiguration.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/configuration/JobConfiguration.java b/core/src/main/java/org/apache/sqoop/framework/configuration/JobConfiguration.java
new file mode 100644
index 0000000..7c653bf
--- /dev/null
+++ b/core/src/main/java/org/apache/sqoop/framework/configuration/JobConfiguration.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.framework.configuration;
+
+import org.apache.sqoop.model.ConfigurationClass;
+import org.apache.sqoop.model.Form;
+
+@ConfigurationClass
+public class JobConfiguration {
+
+ @Form public ThrottlingForm throttling;
+
+ public JobConfiguration() {
+ throttling = new ThrottlingForm();
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/0f90d7d3/core/src/main/java/org/apache/sqoop/repository/Repository.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/repository/Repository.java b/core/src/main/java/org/apache/sqoop/repository/Repository.java
index ecf5004..5087a39 100644
--- a/core/src/main/java/org/apache/sqoop/repository/Repository.java
+++ b/core/src/main/java/org/apache/sqoop/repository/Repository.java
@@ -18,6 +18,7 @@
package org.apache.sqoop.repository;
import org.apache.log4j.Logger;
+import org.apache.sqoop.common.ConnectorType;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.connector.ConnectorManager;
import org.apache.sqoop.connector.spi.MetadataUpgrader;
@@ -37,7 +38,6 @@ import org.apache.sqoop.utils.ClassUtils;
import org.apache.sqoop.validation.Validation;
import org.apache.sqoop.validation.Validator;
-import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
@@ -446,16 +446,18 @@ public abstract class Repository {
// Make a new copy of the forms from the connector,
// else the values will get set in the forms in the connector for
// each connection.
- List<MForm> forms = newConnector.getJobForms(job.getType()).clone(false).getForms();
- MJobForms newJobForms = new MJobForms(job.getType(), forms);
- upgrader.upgrade(job.getConnectorPart(), newJobForms);
- MJob newJob = new MJob(job, newJobForms, job.getFrameworkPart());
+ List<MForm> forms = newConnector.getJobForms(ConnectorType.FROM).clone(false).getForms();
+ MJobForms newJobForms = new MJobForms(forms);
+ upgrader.upgrade(job.getConnectorPart(ConnectorType.FROM), newJobForms);
+ // @TODO(Abe): Check From and To
+ MJob newJob = new MJob(job, newJobForms, job.getFrameworkPart(), newJobForms);
// Transform form structures to objects for validations
- Object newConfigurationObject = ClassUtils.instantiate(connector.getJobConfigurationClass(job.getType()));
- FormUtils.fromForms(newJob.getConnectorPart().getForms(), newConfigurationObject);
+ // @TODO(Abe): Check From and To
+ Object newConfigurationObject = ClassUtils.instantiate(connector.getJobConfigurationClass(ConnectorType.FROM));
+ FormUtils.fromForms(newJob.getConnectorPart(ConnectorType.FROM).getForms(), newConfigurationObject);
- Validation validation = validator.validateJob(newJob.getType(), newConfigurationObject);
+ Validation validation = validator.validateJob(newConfigurationObject);
if (validation.getStatus().canProceed()) {
updateJob(newJob, tx);
} else {
@@ -509,6 +511,7 @@ public abstract class Repository {
// Make a new copy of the forms from the connector,
// else the values will get set in the forms in the connector for
// each connection.
+ // @TODO(Abe): From/To connection forms.
List<MForm> forms = framework.getConnectionForms().clone(false).getForms();
MConnectionForms newConnectionForms = new MConnectionForms(forms);
upgrader.upgrade(connection.getFrameworkPart(), newConnectionForms);
@@ -530,16 +533,16 @@ public abstract class Repository {
// Make a new copy of the forms from the framework,
// else the values will get set in the forms in the connector for
// each connection.
- List<MForm> forms = framework.getJobForms(job.getType()).clone(false).getForms();
- MJobForms newJobForms = new MJobForms(job.getType(), forms);
+ List<MForm> forms = framework.getJobForms().clone(false).getForms();
+ MJobForms newJobForms = new MJobForms(forms);
upgrader.upgrade(job.getFrameworkPart(), newJobForms);
- MJob newJob = new MJob(job, job.getConnectorPart(), newJobForms);
+ MJob newJob = new MJob(job, job.getConnectorPart(ConnectorType.FROM), newJobForms, job.getConnectorPart(ConnectorType.TO));
// Transform form structures to objects for validations
- Object newConfigurationObject = ClassUtils.instantiate(FrameworkManager.getInstance().getJobConfigurationClass(job.getType()));
+ Object newConfigurationObject = ClassUtils.instantiate(FrameworkManager.getInstance().getJobConfigurationClass());
FormUtils.fromForms(newJob.getFrameworkPart().getForms(), newConfigurationObject);
- Validation validation = validator.validateJob(newJob.getType(), newConfigurationObject);
+ Validation validation = validator.validateJob(newConfigurationObject);
if (validation.getStatus().canProceed()) {
updateJob(newJob, tx);
} else {
http://git-wip-us.apache.org/repos/asf/sqoop/blob/0f90d7d3/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java b/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java
index 84f6213..82b195a 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java
@@ -20,22 +20,14 @@ package org.apache.sqoop.execution.mapreduce;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.io.NullWritable;
import org.apache.sqoop.common.MutableMapContext;
-import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.framework.ExecutionEngine;
import org.apache.sqoop.framework.SubmissionRequest;
-import org.apache.sqoop.framework.configuration.ExportJobConfiguration;
-import org.apache.sqoop.framework.configuration.ImportJobConfiguration;
-import org.apache.sqoop.framework.configuration.OutputFormat;
+import org.apache.sqoop.framework.configuration.JobConfiguration;
import org.apache.sqoop.job.JobConstants;
-import org.apache.sqoop.job.MapreduceExecutionError;
-import org.apache.sqoop.job.etl.Exporter;
-import org.apache.sqoop.job.etl.HdfsExportExtractor;
-import org.apache.sqoop.job.etl.HdfsExportPartitioner;
-import org.apache.sqoop.job.etl.HdfsSequenceImportLoader;
-import org.apache.sqoop.job.etl.HdfsTextImportLoader;
-import org.apache.sqoop.job.etl.Importer;
+import org.apache.sqoop.job.etl.From;
+import org.apache.sqoop.job.etl.To;
+import org.apache.sqoop.job.io.Data;
import org.apache.sqoop.job.io.SqoopWritable;
-import org.apache.sqoop.job.mr.SqoopFileOutputFormat;
import org.apache.sqoop.job.mr.SqoopInputFormat;
import org.apache.sqoop.job.mr.SqoopMapper;
import org.apache.sqoop.job.mr.SqoopNullOutputFormat;
@@ -69,99 +61,66 @@ public class MapreduceExecutionEngine extends ExecutionEngine {
request.setOutputValueClass(NullWritable.class);
// Set up framework context
+ From from = (From)request.getFromCallback();
+ To to = (To)request.getToCallback();
MutableMapContext context = request.getFrameworkContext();
+ context.setString(JobConstants.JOB_ETL_PARTITIONER, from.getPartitioner().getName());
+ context.setString(JobConstants.JOB_ETL_EXTRACTOR, from.getExtractor().getName());
+ context.setString(JobConstants.JOB_ETL_LOADER, to.getLoader().getName());
+ context.setString(JobConstants.JOB_ETL_DESTROYER, from.getDestroyer().getName());
context.setString(JobConstants.INTERMEDIATE_DATA_FORMAT,
- request.getIntermediateDataFormat().getName());
+ request.getIntermediateDataFormat().getName());
if(request.getExtractors() != null) {
context.setInteger(JobConstants.JOB_ETL_EXTRACTOR_NUM, request.getExtractors());
}
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void prepareImportSubmission(SubmissionRequest gRequest) {
- MRSubmissionRequest request = (MRSubmissionRequest) gRequest;
-
- prepareSubmission(request);
- request.setOutputFormatClass(SqoopFileOutputFormat.class);
- ImportJobConfiguration jobConf = (ImportJobConfiguration) request.getConfigFrameworkJob();
-
- Importer importer = (Importer)request.getConnectorCallbacks();
-
- // Set up framework context
- MutableMapContext context = request.getFrameworkContext();
- context.setString(JobConstants.JOB_ETL_PARTITIONER, importer.getPartitioner().getName());
- context.setString(JobConstants.JOB_ETL_EXTRACTOR, importer.getExtractor().getName());
- context.setString(JobConstants.JOB_ETL_DESTROYER, importer.getDestroyer().getName());
-
- // TODO: This settings should be abstracted to core module at some point
- if(jobConf.output.outputFormat == OutputFormat.TEXT_FILE) {
- context.setString(JobConstants.JOB_ETL_LOADER, HdfsTextImportLoader.class.getName());
- } else if(jobConf.output.outputFormat == OutputFormat.SEQUENCE_FILE) {
- context.setString(JobConstants.JOB_ETL_LOADER, HdfsSequenceImportLoader.class.getName());
- } else {
- throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0024,
- "Format: " + jobConf.output.outputFormat);
- }
- if(getCompressionCodecName(jobConf) != null) {
- context.setString(JobConstants.HADOOP_COMPRESS_CODEC,
- getCompressionCodecName(jobConf));
- context.setBoolean(JobConstants.HADOOP_COMPRESS, true);
+ if(request.getExtractors() != null) {
+ context.setInteger(JobConstants.JOB_ETL_EXTRACTOR_NUM, request.getExtractors());
}
- }
- private String getCompressionCodecName(ImportJobConfiguration jobConf) {
- if(jobConf.output.compression == null)
- return null;
- switch(jobConf.output.compression) {
- case NONE:
- return null;
- case DEFAULT:
- return "org.apache.hadoop.io.compress.DefaultCodec";
- case DEFLATE:
- return "org.apache.hadoop.io.compress.DeflateCodec";
- case GZIP:
- return "org.apache.hadoop.io.compress.GzipCodec";
- case BZIP2:
- return "org.apache.hadoop.io.compress.BZip2Codec";
- case LZO:
- return "com.hadoop.compression.lzo.LzoCodec";
- case LZ4:
- return "org.apache.hadoop.io.compress.Lz4Codec";
- case SNAPPY:
- return "org.apache.hadoop.io.compress.SnappyCodec";
- case CUSTOM:
- return jobConf.output.customCompression.trim();
- }
- return null;
+ // @TODO(Abe): Move to HDFS connector.
+// if(jobConf.output.outputFormat == OutputFormat.TEXT_FILE) {
+// context.setString(JobConstants.JOB_ETL_LOADER, HdfsTextImportLoader.class.getName());
+// } else if(jobConf.output.outputFormat == OutputFormat.SEQUENCE_FILE) {
+// context.setString(JobConstants.JOB_ETL_LOADER, HdfsSequenceImportLoader.class.getName());
+// } else {
+// throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0024,
+// "Format: " + jobConf.output.outputFormat);
+// }
+// if(getCompressionCodecName(jobConf) != null) {
+// context.setString(JobConstants.HADOOP_COMPRESS_CODEC,
+// getCompressionCodecName(jobConf));
+// context.setBoolean(JobConstants.HADOOP_COMPRESS, true);
+// }
}
- /**
- * {@inheritDoc}
- */
- @Override
- public void prepareExportSubmission(SubmissionRequest gRequest) {
- MRSubmissionRequest request = (MRSubmissionRequest) gRequest;
- ExportJobConfiguration jobConf = (ExportJobConfiguration) request.getConfigFrameworkJob();
-
- prepareSubmission(request);
-
- Exporter exporter = (Exporter)request.getConnectorCallbacks();
-
- // Set up framework context
- MutableMapContext context = request.getFrameworkContext();
- context.setString(JobConstants.JOB_ETL_PARTITIONER, HdfsExportPartitioner.class.getName());
- context.setString(JobConstants.JOB_ETL_LOADER, exporter.getLoader().getName());
- context.setString(JobConstants.JOB_ETL_DESTROYER, exporter.getDestroyer().getName());
-
- // Extractor that will be able to read all supported file types
- context.setString(JobConstants.JOB_ETL_EXTRACTOR, HdfsExportExtractor.class.getName());
- context.setString(JobConstants.HADOOP_INPUTDIR, jobConf.input.inputDirectory);
- }
+ // @TODO(Abe): Move to HDFS connector.
+// private String getCompressionCodecName(ImportJobConfiguration jobConf) {
+// if(jobConf.output.compression == null)
+// return null;
+// switch(jobConf.output.compression) {
+// case NONE:
+// return null;
+// case DEFAULT:
+// return "org.apache.hadoop.io.compress.DefaultCodec";
+// case DEFLATE:
+// return "org.apache.hadoop.io.compress.DeflateCodec";
+// case GZIP:
+// return "org.apache.hadoop.io.compress.GzipCodec";
+// case BZIP2:
+// return "org.apache.hadoop.io.compress.BZip2Codec";
+// case LZO:
+// return "com.hadoop.compression.lzo.LzoCodec";
+// case LZ4:
+// return "org.apache.hadoop.io.compress.Lz4Codec";
+// case SNAPPY:
+// return "org.apache.hadoop.io.compress.SnappyCodec";
+// case CUSTOM:
+// return jobConf.output.customCompression.trim();
+// }
+// return null;
+// }
/**
* Our execution engine have additional dependencies that needs to be available
http://git-wip-us.apache.org/repos/asf/sqoop/blob/0f90d7d3/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java
index b2fa15d..4cdb002 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java
@@ -51,8 +51,11 @@ public final class JobConstants extends Constants {
public static final String JOB_ETL_EXTRACTOR_NUM = PREFIX_JOB_CONFIG
+ "etl.extractor.count";
- public static final String PREFIX_CONNECTOR_CONTEXT =
- PREFIX_JOB_CONFIG + "connector.context.";
+ public static final String PREFIX_CONNECTOR_FROM_CONTEXT =
+ PREFIX_JOB_CONFIG + "connector.from.context.";
+
+ public static final String PREFIX_CONNECTOR_TO_CONTEXT =
+ PREFIX_JOB_CONFIG + "connector.to.context.";
// Hadoop specific constants
// We're using constants from Hadoop 1. Hadoop 2 has different names, but