You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ja...@apache.org on 2014/08/08 22:12:23 UTC
[5/5] git commit: SQOOP-1376: Sqoop2: From/To: Refactor connector
interface
SQOOP-1376: Sqoop2: From/To: Refactor connector interface
(Abraham Elmahrek via Jarek Jarcec Cecho)
Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/dfd30036
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/dfd30036
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/dfd30036
Branch: refs/heads/SQOOP-1367
Commit: dfd300366084e81b9a9c31fe41c89ed8be961462
Parents: d883557
Author: Jarek Jarcec Cecho <ja...@apache.org>
Authored: Fri Aug 8 13:11:35 2014 -0700
Committer: Jarek Jarcec Cecho <ja...@apache.org>
Committed: Fri Aug 8 13:11:35 2014 -0700
----------------------------------------------------------------------
.../org/apache/sqoop/client/SqoopClient.java | 44 +-
.../sqoop/client/request/ConnectionRequest.java | 18 +-
.../apache/sqoop/client/request/JobRequest.java | 10 +-
.../sqoop/client/request/SqoopRequests.java | 14 +-
.../org/apache/sqoop/common/ConnectorType.java | 30 +
.../sqoop/json/ConnectionValidationBean.java | 143 +++++
.../org/apache/sqoop/json/ConnectorBean.java | 37 +-
.../org/apache/sqoop/json/FrameworkBean.java | 29 +-
.../java/org/apache/sqoop/json/JobBean.java | 51 +-
.../apache/sqoop/json/JobValidationBean.java | 157 +++++
.../org/apache/sqoop/json/ValidationBean.java | 143 -----
.../java/org/apache/sqoop/model/MConnector.java | 72 ++-
.../java/org/apache/sqoop/model/MFramework.java | 55 +-
.../main/java/org/apache/sqoop/model/MJob.java | 131 ++--
.../java/org/apache/sqoop/model/MJobForms.java | 30 +-
.../connector/jdbc/GenericJdbcConnector.java | 46 +-
.../jdbc/GenericJdbcConnectorConstants.java | 6 +-
.../jdbc/GenericJdbcExportDestroyer.java | 62 --
.../jdbc/GenericJdbcExportInitializer.java | 171 ------
.../connector/jdbc/GenericJdbcExportLoader.java | 76 ---
.../connector/jdbc/GenericJdbcExtractor.java | 78 +++
.../jdbc/GenericJdbcFromDestroyer.java | 36 ++
.../jdbc/GenericJdbcFromInitializer.java | 321 ++++++++++
.../jdbc/GenericJdbcImportDestroyer.java | 36 --
.../jdbc/GenericJdbcImportExtractor.java | 78 ---
.../jdbc/GenericJdbcImportInitializer.java | 321 ----------
.../jdbc/GenericJdbcImportPartition.java | 53 --
.../jdbc/GenericJdbcImportPartitioner.java | 605 -------------------
.../sqoop/connector/jdbc/GenericJdbcLoader.java | 76 +++
.../connector/jdbc/GenericJdbcPartition.java | 53 ++
.../connector/jdbc/GenericJdbcPartitioner.java | 604 ++++++++++++++++++
.../connector/jdbc/GenericJdbcToDestroyer.java | 62 ++
.../jdbc/GenericJdbcToInitializer.java | 171 ++++++
.../connector/jdbc/GenericJdbcValidator.java | 24 +-
.../configuration/ExportJobConfiguration.java | 33 -
.../jdbc/configuration/ExportTableForm.java | 34 --
.../configuration/FromJobConfiguration.java | 33 +
.../jdbc/configuration/FromTableForm.java | 35 ++
.../configuration/ImportJobConfiguration.java | 33 -
.../jdbc/configuration/ImportTableForm.java | 35 --
.../jdbc/configuration/ToJobConfiguration.java | 33 +
.../jdbc/configuration/ToTableForm.java | 34 ++
.../connector/jdbc/TestExportInitializer.java | 2 +-
.../sqoop/connector/jdbc/TestExportLoader.java | 2 +-
.../connector/jdbc/TestImportExtractor.java | 2 +-
.../connector/jdbc/TestImportInitializer.java | 2 +-
.../connector/jdbc/TestImportPartitioner.java | 2 +-
.../connector/mysqljdbc/MySqlJdbcConnector.java | 8 +-
.../sqoop/connector/ConnectorHandler.java | 22 +-
.../apache/sqoop/framework/ExecutionEngine.java | 10 +-
.../sqoop/framework/FrameworkManager.java | 32 +-
.../sqoop/framework/FrameworkValidator.java | 102 ++--
.../org/apache/sqoop/framework/JobManager.java | 243 +++++---
.../sqoop/framework/SubmissionRequest.java | 109 ++--
.../configuration/JobConfiguration.java | 31 +
.../org/apache/sqoop/repository/Repository.java | 29 +-
.../mapreduce/MapreduceExecutionEngine.java | 149 ++---
.../java/org/apache/sqoop/job/JobConstants.java | 7 +-
.../sqoop/job/etl/HdfsExportExtractor.java | 316 +++++-----
.../apache/sqoop/job/mr/ConfigurationUtils.java | 187 ++++--
.../sqoop/job/mr/SqoopDestroyerExecutor.java | 14 +-
.../apache/sqoop/job/mr/SqoopInputFormat.java | 10 +-
.../org/apache/sqoop/job/mr/SqoopMapper.java | 27 +-
.../job/mr/SqoopOutputFormatLoadExecutor.java | 26 +-
.../org/apache/sqoop/job/TestHdfsExtract.java | 2 +-
.../derby/DerbyRepositoryHandler.java | 278 ++++++---
.../repository/derby/DerbySchemaConstants.java | 4 +-
.../repository/derby/DerbySchemaQuery.java | 72 ++-
.../sqoop/handler/ConnectionRequestHandler.java | 6 +-
.../apache/sqoop/handler/JobRequestHandler.java | 63 +-
.../apache/sqoop/shell/CloneJobFunction.java | 8 +-
.../apache/sqoop/shell/CreateJobFunction.java | 37 +-
.../sqoop/shell/DeleteConnectionFunction.java | 2 +-
.../sqoop/shell/ShowConnectionFunction.java | 8 +-
.../org/apache/sqoop/shell/ShowJobFunction.java | 27 +-
.../apache/sqoop/shell/UpdateJobFunction.java | 8 +-
.../org/apache/sqoop/shell/core/Constants.java | 22 +-
.../apache/sqoop/shell/utils/FormDisplayer.java | 37 +-
.../apache/sqoop/shell/utils/FormFiller.java | 56 +-
.../shell/utils/JobDynamicFormOptions.java | 6 +-
.../main/resources/shell-resource.properties | 9 +-
.../sqoop/connector/spi/SqoopConnector.java | 16 +-
.../java/org/apache/sqoop/job/etl/Exporter.java | 51 --
.../java/org/apache/sqoop/job/etl/From.java | 58 ++
.../java/org/apache/sqoop/job/etl/Importer.java | 58 --
.../main/java/org/apache/sqoop/job/etl/To.java | 51 ++
.../org/apache/sqoop/validation/Validator.java | 3 +-
.../mapreduce/MapreduceSubmissionEngine.java | 34 +-
88 files changed, 3401 insertions(+), 2960 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/sqoop/blob/dfd30036/client/src/main/java/org/apache/sqoop/client/SqoopClient.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/sqoop/client/SqoopClient.java b/client/src/main/java/org/apache/sqoop/client/SqoopClient.java
index 05ea6d6..b42f234 100644
--- a/client/src/main/java/org/apache/sqoop/client/SqoopClient.java
+++ b/client/src/main/java/org/apache/sqoop/client/SqoopClient.java
@@ -18,10 +18,12 @@
package org.apache.sqoop.client;
import org.apache.sqoop.client.request.SqoopRequests;
+import org.apache.sqoop.common.ConnectorType;
import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.json.ConnectionValidationBean;
import org.apache.sqoop.json.ConnectorBean;
import org.apache.sqoop.json.FrameworkBean;
-import org.apache.sqoop.json.ValidationBean;
+import org.apache.sqoop.json.JobValidationBean;
import org.apache.sqoop.model.FormUtils;
import org.apache.sqoop.model.MConnection;
import org.apache.sqoop.model.MConnector;
@@ -351,21 +353,24 @@ public class SqoopClient {
}
/**
- * Create new job of given type and for given connection.
+ * Create new job the for given connections.
*
- * @param xid Connection id
- * @param type Job type
+ * @param fromXid From Connection id
+ * @param toXid To Connection id
* @return
*/
- public MJob newJob(long xid, MJob.Type type) {
- MConnection connection = getConnection(xid);
+ public MJob newJob(long fromXid, long toXid) {
+ MConnection fromConnection = getConnection(fromXid);
+ MConnection toConnection = getConnection(toXid);
return new MJob(
- connection.getConnectorId(),
- connection.getPersistenceId(),
- type,
- getConnector(connection.getConnectorId()).getJobForms(type),
- getFramework().getJobForms(type)
+ fromConnection.getConnectorId(),
+ toConnection.getConnectorId(),
+ fromConnection.getPersistenceId(),
+ toConnection.getPersistenceId(),
+ getConnector(fromConnection.getConnectorId()).getJobForms(ConnectorType.FROM),
+ getConnector(fromConnection.getConnectorId()).getJobForms(ConnectorType.TO),
+ getFramework().getJobForms()
);
}
@@ -529,7 +534,7 @@ public class SqoopClient {
return requests.readHistory(jid).getSubmissions();
}
- private Status applyValidations(ValidationBean bean, MConnection connection) {
+ private Status applyValidations(ConnectionValidationBean bean, MConnection connection) {
Validation connector = bean.getConnectorValidation();
Validation framework = bean.getFrameworkValidation();
@@ -544,18 +549,25 @@ public class SqoopClient {
return Status.getWorstStatus(connector.getStatus(), framework.getStatus());
}
- private Status applyValidations(ValidationBean bean, MJob job) {
- Validation connector = bean.getConnectorValidation();
+ private Status applyValidations(JobValidationBean bean, MJob job) {
+ Validation fromConnector = bean.getConnectorValidation(ConnectorType.FROM);
+ Validation toConnector = bean.getConnectorValidation(ConnectorType.TO);
Validation framework = bean.getFrameworkValidation();
- FormUtils.applyValidation(job.getConnectorPart().getForms(), connector);
+ // @TODO(Abe): From/To validation.
+ FormUtils.applyValidation(
+ job.getConnectorPart(ConnectorType.FROM).getForms(),
+ fromConnector);
FormUtils.applyValidation(job.getFrameworkPart().getForms(), framework);
+ FormUtils.applyValidation(
+ job.getConnectorPart(ConnectorType.TO).getForms(),
+ toConnector);
Long id = bean.getId();
if(id != null) {
job.setPersistenceId(id);
}
- return Status.getWorstStatus(connector.getStatus(), framework.getStatus());
+ return Status.getWorstStatus(fromConnector.getStatus(), framework.getStatus(), toConnector.getStatus());
}
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/dfd30036/client/src/main/java/org/apache/sqoop/client/request/ConnectionRequest.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/sqoop/client/request/ConnectionRequest.java b/client/src/main/java/org/apache/sqoop/client/request/ConnectionRequest.java
index f523abb..e0740a9 100644
--- a/client/src/main/java/org/apache/sqoop/client/request/ConnectionRequest.java
+++ b/client/src/main/java/org/apache/sqoop/client/request/ConnectionRequest.java
@@ -18,7 +18,7 @@
package org.apache.sqoop.client.request;
import org.apache.sqoop.json.ConnectionBean;
-import org.apache.sqoop.json.ValidationBean;
+import org.apache.sqoop.json.ConnectionValidationBean;
import org.apache.sqoop.model.MConnection;
import org.json.simple.JSONObject;
import org.json.simple.JSONValue;
@@ -49,7 +49,7 @@ public class ConnectionRequest extends Request {
return connectionBean;
}
- public ValidationBean create(String serverUrl, MConnection connection) {
+ public ConnectionValidationBean create(String serverUrl, MConnection connection) {
ConnectionBean connectionBean = new ConnectionBean(connection);
@@ -59,13 +59,13 @@ public class ConnectionRequest extends Request {
String response = super.post(serverUrl + RESOURCE,
connectionJson.toJSONString());
- ValidationBean validationBean = new ValidationBean();
- validationBean.restore((JSONObject) JSONValue.parse(response));
+ ConnectionValidationBean connectionValidationBean = new ConnectionValidationBean();
+ connectionValidationBean.restore((JSONObject) JSONValue.parse(response));
- return validationBean;
+ return connectionValidationBean;
}
- public ValidationBean update(String serverUrl, MConnection connection) {
+ public ConnectionValidationBean update(String serverUrl, MConnection connection) {
ConnectionBean connectionBean = new ConnectionBean(connection);
@@ -76,10 +76,10 @@ public class ConnectionRequest extends Request {
+ connection.getPersistenceId(),
connectionJson.toJSONString());
- ValidationBean validationBean = new ValidationBean();
- validationBean.restore((JSONObject) JSONValue.parse(response));
+ ConnectionValidationBean connectionValidationBean = new ConnectionValidationBean();
+ connectionValidationBean.restore((JSONObject) JSONValue.parse(response));
- return validationBean;
+ return connectionValidationBean;
}
public void delete(String serverUrl, Long id) {
http://git-wip-us.apache.org/repos/asf/sqoop/blob/dfd30036/client/src/main/java/org/apache/sqoop/client/request/JobRequest.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/sqoop/client/request/JobRequest.java b/client/src/main/java/org/apache/sqoop/client/request/JobRequest.java
index 6dee2c8..b824512 100644
--- a/client/src/main/java/org/apache/sqoop/client/request/JobRequest.java
+++ b/client/src/main/java/org/apache/sqoop/client/request/JobRequest.java
@@ -18,7 +18,7 @@
package org.apache.sqoop.client.request;
import org.apache.sqoop.json.JobBean;
-import org.apache.sqoop.json.ValidationBean;
+import org.apache.sqoop.json.JobValidationBean;
import org.apache.sqoop.model.MJob;
import org.json.simple.JSONObject;
import org.json.simple.JSONValue;
@@ -49,7 +49,7 @@ public class JobRequest extends Request {
return jobBean;
}
- public ValidationBean create(String serverUrl, MJob job) {
+ public JobValidationBean create(String serverUrl, MJob job) {
JobBean jobBean = new JobBean(job);
@@ -59,13 +59,13 @@ public class JobRequest extends Request {
String response = super.post(serverUrl + RESOURCE,
jobJson.toJSONString());
- ValidationBean validationBean = new ValidationBean();
+ JobValidationBean validationBean = new JobValidationBean();
validationBean.restore((JSONObject) JSONValue.parse(response));
return validationBean;
}
- public ValidationBean update(String serverUrl, MJob job) {
+ public JobValidationBean update(String serverUrl, MJob job) {
JobBean jobBean = new JobBean(job);
@@ -75,7 +75,7 @@ public class JobRequest extends Request {
String response = super.put(serverUrl + RESOURCE + job.getPersistenceId(),
jobJson.toJSONString());
- ValidationBean validationBean = new ValidationBean();
+ JobValidationBean validationBean = new JobValidationBean();
validationBean.restore((JSONObject) JSONValue.parse(response));
return validationBean;
http://git-wip-us.apache.org/repos/asf/sqoop/blob/dfd30036/client/src/main/java/org/apache/sqoop/client/request/SqoopRequests.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/sqoop/client/request/SqoopRequests.java b/client/src/main/java/org/apache/sqoop/client/request/SqoopRequests.java
index ffaa84f..d87bb78 100644
--- a/client/src/main/java/org/apache/sqoop/client/request/SqoopRequests.java
+++ b/client/src/main/java/org/apache/sqoop/client/request/SqoopRequests.java
@@ -18,16 +18,14 @@
package org.apache.sqoop.client.request;
import org.apache.sqoop.json.ConnectionBean;
+import org.apache.sqoop.json.ConnectionValidationBean;
import org.apache.sqoop.json.ConnectorBean;
import org.apache.sqoop.json.FrameworkBean;
import org.apache.sqoop.json.JobBean;
+import org.apache.sqoop.json.JobValidationBean;
import org.apache.sqoop.json.SubmissionBean;
-import org.apache.sqoop.json.ValidationBean;
-import org.apache.sqoop.model.FormUtils;
import org.apache.sqoop.model.MConnection;
import org.apache.sqoop.model.MJob;
-import org.apache.sqoop.validation.Status;
-import org.apache.sqoop.validation.Validation;
/**
* Unified class for all request objects.
@@ -94,7 +92,7 @@ public class SqoopRequests {
return getConnectorRequest().read(serverUrl, cid);
}
- public ValidationBean createConnection(MConnection connection) {
+ public ConnectionValidationBean createConnection(MConnection connection) {
return getConnectionRequest().create(serverUrl, connection);
}
@@ -102,7 +100,7 @@ public class SqoopRequests {
return getConnectionRequest().read(serverUrl, connectionId);
}
- public ValidationBean updateConnection(MConnection connection) {
+ public ConnectionValidationBean updateConnection(MConnection connection) {
return getConnectionRequest().update(serverUrl, connection);
}
@@ -114,7 +112,7 @@ public class SqoopRequests {
getConnectionRequest().delete(serverUrl, xid);
}
- public ValidationBean createJob(MJob job) {
+ public JobValidationBean createJob(MJob job) {
return getJobRequest().create(serverUrl, job);
}
@@ -122,7 +120,7 @@ public class SqoopRequests {
return getJobRequest().read(serverUrl, jobId);
}
- public ValidationBean updateJob(MJob job) {
+ public JobValidationBean updateJob(MJob job) {
return getJobRequest().update(serverUrl, job);
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/dfd30036/common/src/main/java/org/apache/sqoop/common/ConnectorType.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/common/ConnectorType.java b/common/src/main/java/org/apache/sqoop/common/ConnectorType.java
new file mode 100644
index 0000000..d3d1d19
--- /dev/null
+++ b/common/src/main/java/org/apache/sqoop/common/ConnectorType.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.common;
+
+/**
+ * Connectors will have configurations for FROM and TO.
+ * If the connector is being used to extract data FROM,
+ * then the connector type will be FROM. If the connector
+ * is being used to load data TO, then the connector type
+ * will be TO.
+ */
+public enum ConnectorType {
+ FROM,
+ TO
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/dfd30036/common/src/main/java/org/apache/sqoop/json/ConnectionValidationBean.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/json/ConnectionValidationBean.java b/common/src/main/java/org/apache/sqoop/json/ConnectionValidationBean.java
new file mode 100644
index 0000000..ffdd13e
--- /dev/null
+++ b/common/src/main/java/org/apache/sqoop/json/ConnectionValidationBean.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.json;
+
+import org.apache.sqoop.validation.Status;
+import org.apache.sqoop.validation.Validation;
+import org.json.simple.JSONObject;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Bean for sending validations across network. This bean will move two
+ * validation objects at one time - one for connector and second for framework
+ * part of validated entity. Optionally validation bean can also transfer
+ * created persistent id in case that new entity was created.
+ */
+public class ConnectionValidationBean implements JsonBean {
+
+ private static final String ID = "id";
+ private static final String FRAMEWORK = "framework";
+ private static final String CONNECTOR = "connector";
+ private static final String STATUS = "status";
+ private static final String MESSAGE = "message";
+ private static final String MESSAGES = "messages";
+
+ private Long id;
+ private Validation connectorValidation;
+ private Validation frameworkValidation;
+
+ // For "extract"
+ public ConnectionValidationBean(Validation connector, Validation framework) {
+ this();
+
+ this.connectorValidation = connector;
+ this.frameworkValidation = framework;
+ }
+
+ // For "restore"
+ public ConnectionValidationBean() {
+ id = null;
+ }
+
+ public Validation getConnectorValidation() {
+ return connectorValidation;
+ }
+
+ public Validation getFrameworkValidation() {
+ return frameworkValidation;
+ }
+
+ public void setId(Long id) {
+ this.id = id;
+ }
+
+ public Long getId() {
+ return id;
+ }
+
+ @SuppressWarnings("unchecked")
+ public JSONObject extract(boolean skipSensitive) {
+ JSONObject object = new JSONObject();
+
+ // Optionally transfer id
+ if(id != null) {
+ object.put(ID, id);
+ }
+
+ object.put(CONNECTOR, extractValidation(connectorValidation));
+ object.put(FRAMEWORK, extractValidation(frameworkValidation));
+
+ return object;
+ }
+
+ @SuppressWarnings("unchecked")
+ private JSONObject extractValidation(Validation validation) {
+ JSONObject object = new JSONObject();
+
+ object.put(STATUS, validation.getStatus().name());
+
+ JSONObject jsonMessages = new JSONObject();
+ Map<Validation.FormInput, Validation.Message> messages = validation.getMessages();
+
+ for(Map.Entry<Validation.FormInput, Validation.Message> entry : messages.entrySet()) {
+ JSONObject jsonEntry = new JSONObject();
+ jsonEntry.put(STATUS, entry.getValue().getStatus().name());
+ jsonEntry.put(MESSAGE, entry.getValue().getMessage());
+ jsonMessages.put(entry.getKey(), jsonEntry);
+ }
+
+ object.put(MESSAGES, jsonMessages);
+
+ return object;
+ }
+
+ @Override
+ public void restore(JSONObject jsonObject) {
+ // Optional and accepting NULLs
+ id = (Long) jsonObject.get(ID);
+
+ connectorValidation = restoreValidation(
+ (JSONObject)jsonObject.get(CONNECTOR));
+ frameworkValidation = restoreValidation(
+ (JSONObject)jsonObject.get(FRAMEWORK));
+ }
+
+ public Validation restoreValidation(JSONObject jsonObject) {
+ JSONObject jsonMessages = (JSONObject) jsonObject.get(MESSAGES);
+ Map<Validation.FormInput, Validation.Message> messages
+ = new HashMap<Validation.FormInput, Validation.Message>();
+
+ for(Object key : jsonMessages.keySet()) {
+ JSONObject jsonMessage = (JSONObject) jsonMessages.get(key);
+
+ Status status = Status.valueOf((String) jsonMessage.get(STATUS));
+ String stringMessage = (String) jsonMessage.get(MESSAGE);
+
+ Validation.Message message
+ = new Validation.Message(status, stringMessage);
+
+ messages.put(new Validation.FormInput((String)key), message);
+ }
+
+ Status status = Status.valueOf((String) jsonObject.get(STATUS));
+
+ return new Validation(status, messages);
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/dfd30036/common/src/main/java/org/apache/sqoop/json/ConnectorBean.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/json/ConnectorBean.java b/common/src/main/java/org/apache/sqoop/json/ConnectorBean.java
index cbe049a..ed1de6e 100644
--- a/common/src/main/java/org/apache/sqoop/json/ConnectorBean.java
+++ b/common/src/main/java/org/apache/sqoop/json/ConnectorBean.java
@@ -24,8 +24,8 @@ import java.util.Map;
import java.util.ResourceBundle;
import java.util.Set;
+import org.apache.sqoop.common.ConnectorType;
import org.apache.sqoop.model.MConnectionForms;
-import org.apache.sqoop.model.MJob;
import org.apache.sqoop.model.MJobForms;
import org.apache.sqoop.model.MConnector;
import org.apache.sqoop.model.MForm;
@@ -73,14 +73,13 @@ public class ConnectorBean implements JsonBean {
object.put(NAME, connector.getUniqueName());
object.put(CLASS, connector.getClassName());
object.put(VERSION, connector.getVersion());
- object.put(CON_FORMS, extractForms(connector.getConnectionForms().getForms(), skipSensitive));
-
- JSONObject jobForms = new JSONObject();
- for (MJobForms job : connector.getAllJobsForms().values()) {
- jobForms.put(job.getType().name(), extractForms(job.getForms(), skipSensitive));
- }
- object.put(JOB_FORMS, jobForms);
+ object.put(CON_FORMS, extractForms(connector.getConnectionForms().getForms(), skipSensitive));
+ object.put(JOB_FORMS, new JSONObject());
+ ((JSONObject)object.get(JOB_FORMS)).put(
+ ConnectorType.FROM, extractForms(connector.getJobForms(ConnectorType.FROM).getForms(), skipSensitive));
+ ((JSONObject)object.get(JOB_FORMS)).put(
+ ConnectorType.TO, extractForms(connector.getJobForms(ConnectorType.TO).getForms(), skipSensitive));
array.add(object);
}
@@ -119,17 +118,17 @@ public class ConnectorBean implements JsonBean {
List<MForm> connForms = restoreForms((JSONArray) object.get(CON_FORMS));
JSONObject jobJson = (JSONObject) object.get(JOB_FORMS);
- List<MJobForms> jobs = new ArrayList<MJobForms>();
- for( Map.Entry entry : (Set<Map.Entry>) jobJson.entrySet()) {
- MJob.Type type = MJob.Type.valueOf((String) entry.getKey());
-
- List<MForm> jobForms =
- restoreForms((JSONArray) jobJson.get(entry.getKey()));
-
- jobs.add(new MJobForms(type, jobForms));
- }
-
- MConnector connector = new MConnector(uniqueName, className, version, new MConnectionForms(connForms), jobs);
+ JSONArray fromJobJson = (JSONArray)jobJson.get(ConnectorType.FROM.name());
+ JSONArray toJobJson = (JSONArray)jobJson.get(ConnectorType.TO.name());
+ List<MForm> fromJobForms =
+ restoreForms(fromJobJson);
+ List<MForm> toJobForms =
+ restoreForms(toJobJson);
+ MJobForms fromJob = new MJobForms(fromJobForms);
+ MJobForms toJob = new MJobForms(toJobForms);
+ MConnectionForms connection = new MConnectionForms(connForms);
+
+ MConnector connector = new MConnector(uniqueName, className, version, connection, fromJob, toJob);
connector.setPersistenceId(connectorId);
connectors.add(connector);
http://git-wip-us.apache.org/repos/asf/sqoop/blob/dfd30036/common/src/main/java/org/apache/sqoop/json/FrameworkBean.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/json/FrameworkBean.java b/common/src/main/java/org/apache/sqoop/json/FrameworkBean.java
index eb79f98..abbdcc6 100644
--- a/common/src/main/java/org/apache/sqoop/json/FrameworkBean.java
+++ b/common/src/main/java/org/apache/sqoop/json/FrameworkBean.java
@@ -18,6 +18,7 @@
package org.apache.sqoop.json;
import org.apache.sqoop.model.MConnectionForms;
+import org.apache.sqoop.model.MConnector;
import org.apache.sqoop.model.MForm;
import org.apache.sqoop.model.MFramework;
import org.apache.sqoop.model.MJob;
@@ -65,13 +66,10 @@ public class FrameworkBean implements JsonBean {
@SuppressWarnings("unchecked")
@Override
public JSONObject extract(boolean skipSensitive) {
+ // @TODO(Abe): Add From/To connection forms.
JSONArray conForms =
extractForms(framework.getConnectionForms().getForms(), skipSensitive);
- JSONObject jobForms = new JSONObject();
-
- for (MJobForms job : framework.getAllJobsForms().values()) {
- jobForms.put(job.getType().name(), extractForms(job.getForms(), skipSensitive));
- }
+ JSONArray jobForms = extractForms(framework.getJobForms().getForms(), skipSensitive);
JSONObject result = new JSONObject();
result.put(ID, framework.getPersistenceId());
@@ -89,22 +87,13 @@ public class FrameworkBean implements JsonBean {
String frameworkVersion = (String) jsonObject.get(FRAMEWORK_VERSION);
List<MForm> connForms = restoreForms((JSONArray) jsonObject.get(CON_FORMS));
+ List<MForm> jobForms = restoreForms((JSONArray) jsonObject.get(JOB_FORMS));
- JSONObject jobForms = (JSONObject) jsonObject.get(JOB_FORMS);
-
- List<MJobForms> jobs = new ArrayList<MJobForms>();
- for( Map.Entry entry : (Set<Map.Entry>) jobForms.entrySet()) {
- //TODO(jarcec): Handle situation when server is supporting operation
- // that client do not know (server do have newer version than client)
- MJob.Type type = MJob.Type.valueOf((String) entry.getKey());
-
- List<MForm> job = restoreForms((JSONArray) entry.getValue());
-
- jobs.add(new MJobForms(type, job));
- }
-
- framework = new MFramework(new MConnectionForms(connForms), jobs,
- frameworkVersion);
+ // @TODO(Abe): Get From/To connection forms.
+ framework = new MFramework(
+ new MConnectionForms(connForms),
+ new MJobForms(jobForms),
+ frameworkVersion);
framework.setPersistenceId(id);
bundle = restoreResourceBundle((JSONObject) jsonObject.get(RESOURCES));
http://git-wip-us.apache.org/repos/asf/sqoop/blob/dfd30036/common/src/main/java/org/apache/sqoop/json/JobBean.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/json/JobBean.java b/common/src/main/java/org/apache/sqoop/json/JobBean.java
index 1555bd5..cb659ae 100644
--- a/common/src/main/java/org/apache/sqoop/json/JobBean.java
+++ b/common/src/main/java/org/apache/sqoop/json/JobBean.java
@@ -17,6 +17,7 @@
*/
package org.apache.sqoop.json;
+import org.apache.sqoop.common.ConnectorType;
import org.apache.sqoop.model.MForm;
import org.apache.sqoop.model.MJob;
import org.apache.sqoop.model.MJobForms;
@@ -42,10 +43,12 @@ public class JobBean implements JsonBean {
private static final String ALL = "all";
private static final String ID = "id";
private static final String NAME = "name";
- private static final String TYPE = "type";
- private static final String CONNECTION_ID = "connection-id";
- private static final String CONNECTOR_ID = "connector-id";
- private static final String CONNECTOR_PART = "connector";
+ private static final String FROM_CONNECTION_ID = "from-connection-id";
+ private static final String TO_CONNECTION_ID = "to-connection-id";
+ private static final String FROM_CONNECTOR_ID = "from-connector-id";
+ private static final String TO_CONNECTOR_ID = "to-connector-id";
+ private static final String FROM_CONNECTOR_PART = "from-connector";
+ private static final String TO_CONNECTOR_PART = "to-connector";
private static final String FRAMEWORK_PART = "framework";
// Compulsory
@@ -106,16 +109,19 @@ public class JobBean implements JsonBean {
object.put(ID, job.getPersistenceId());
object.put(NAME, job.getName());
- object.put(TYPE, job.getType().name());
object.put(ENABLED, job.getEnabled());
object.put(CREATION_USER, job.getCreationUser());
object.put(CREATION_DATE, job.getCreationDate().getTime());
object.put(UPDATE_USER, job.getLastUpdateUser());
object.put(UPDATE_DATE, job.getLastUpdateDate().getTime());
- object.put(CONNECTION_ID, job.getConnectionId());
- object.put(CONNECTOR_ID, job.getConnectorId());
- object.put(CONNECTOR_PART,
- extractForms(job.getConnectorPart().getForms(), skipSensitive));
+ object.put(FROM_CONNECTION_ID, job.getConnectionId(ConnectorType.FROM));
+ object.put(TO_CONNECTION_ID, job.getConnectionId(ConnectorType.TO));
+ object.put(FROM_CONNECTOR_ID, job.getConnectorId(ConnectorType.FROM));
+ object.put(TO_CONNECTOR_ID, job.getConnectorId(ConnectorType.TO));
+ object.put(FROM_CONNECTOR_PART,
+ extractForms(job.getConnectorPart(ConnectorType.FROM).getForms(),skipSensitive));
+ object.put(TO_CONNECTOR_PART,
+ extractForms(job.getConnectorPart(ConnectorType.TO).getForms(), skipSensitive));
object.put(FRAMEWORK_PART,
extractForms(job.getFrameworkPart().getForms(), skipSensitive));
@@ -151,23 +157,26 @@ public class JobBean implements JsonBean {
for (Object obj : array) {
JSONObject object = (JSONObject) obj;
- long connectorId = (Long) object.get(CONNECTOR_ID);
- long connectionId = (Long) object.get(CONNECTION_ID);
- JSONArray connectorPart = (JSONArray) object.get(CONNECTOR_PART);
+ long fromConnectorId = (Long) object.get(FROM_CONNECTOR_ID);
+ long toConnectorId = (Long) object.get(TO_CONNECTOR_ID);
+ long fromConnectionId = (Long) object.get(FROM_CONNECTION_ID);
+ long toConnectionId = (Long) object.get(TO_CONNECTION_ID);
+ JSONArray fromConnectorPart = (JSONArray) object.get(FROM_CONNECTOR_PART);
+ JSONArray toConnectorPart = (JSONArray) object.get(TO_CONNECTOR_PART);
JSONArray frameworkPart = (JSONArray) object.get(FRAMEWORK_PART);
- String stringType = (String) object.get(TYPE);
- MJob.Type type = MJob.Type.valueOf(stringType);
-
- List<MForm> connectorForms = restoreForms(connectorPart);
+ List<MForm> fromConnectorParts = restoreForms(fromConnectorPart);
+ List<MForm> toConnectorParts = restoreForms(toConnectorPart);
List<MForm> frameworkForms = restoreForms(frameworkPart);
MJob job = new MJob(
- connectorId,
- connectionId,
- type,
- new MJobForms(type, connectorForms),
- new MJobForms(type, frameworkForms)
+ fromConnectorId,
+ toConnectorId,
+ fromConnectionId,
+ toConnectionId,
+ new MJobForms(fromConnectorParts),
+ new MJobForms(toConnectorParts),
+ new MJobForms(frameworkForms)
);
job.setPersistenceId((Long) object.get(ID));
http://git-wip-us.apache.org/repos/asf/sqoop/blob/dfd30036/common/src/main/java/org/apache/sqoop/json/JobValidationBean.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/json/JobValidationBean.java b/common/src/main/java/org/apache/sqoop/json/JobValidationBean.java
new file mode 100644
index 0000000..95c24ff
--- /dev/null
+++ b/common/src/main/java/org/apache/sqoop/json/JobValidationBean.java
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.json;
+
+import org.apache.sqoop.common.ConnectorType;
+import org.apache.sqoop.validation.Status;
+import org.apache.sqoop.validation.Validation;
+import org.json.simple.JSONObject;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Bean for sending validations across network. This bean will move two
+ * validation objects at one time - one for connector and second for framework
+ * part of validated entity. Optionally validation bean can also transfer
+ * created persistent id in case that new entity was created.
+ */
+public class JobValidationBean implements JsonBean {
+
+ private static final String ID = "id";
+ private static final String FRAMEWORK = "framework";
+ private static final String CONNECTOR = "connector";
+ private static final String FROM = "from";
+ private static final String TO = "to";
+ private static final String STATUS = "status";
+ private static final String MESSAGE = "message";
+ private static final String MESSAGES = "messages";
+
+ private Long id;
+ private Map<ConnectorType, Validation> connectorValidation;
+ private Validation frameworkValidation;
+
+ // For "extract"
+ public JobValidationBean(Validation fromConnector, Validation framework, Validation toConnector) {
+ this();
+
+ this.connectorValidation = new HashMap<ConnectorType, Validation>();
+ this.connectorValidation.put(ConnectorType.FROM, fromConnector);
+ this.connectorValidation.put(ConnectorType.TO, toConnector);
+ this.frameworkValidation = framework;
+ }
+
+ // For "restore"
+ public JobValidationBean() {
+ id = null;
+ connectorValidation = new HashMap<ConnectorType, Validation>();
+ }
+
+ public Validation getConnectorValidation(ConnectorType type) {
+ return connectorValidation.get(type);
+ }
+
+ public Validation getFrameworkValidation() {
+ return frameworkValidation;
+ }
+
+ public void setId(Long id) {
+ this.id = id;
+ }
+
+ public Long getId() {
+ return id;
+ }
+
+ @SuppressWarnings("unchecked")
+ public JSONObject extract(boolean skipSensitive) {
+ JSONObject object = new JSONObject();
+ JSONObject connectorObject = new JSONObject();
+
+ // Optionally transfer id
+ if(id != null) {
+ object.put(ID, id);
+ }
+
+ connectorObject.put(FROM, extractValidation(getConnectorValidation(ConnectorType.FROM)));
+ connectorObject.put(TO, extractValidation(getConnectorValidation(ConnectorType.TO)));
+
+ object.put(FRAMEWORK, extractValidation(frameworkValidation));
+ object.put(CONNECTOR, connectorObject);
+
+ return object;
+ }
+
+ @SuppressWarnings("unchecked")
+ private JSONObject extractValidation(Validation validation) {
+ JSONObject object = new JSONObject();
+
+ object.put(STATUS, validation.getStatus().name());
+
+ JSONObject jsonMessages = new JSONObject();
+ Map<Validation.FormInput, Validation.Message> messages = validation.getMessages();
+
+ for(Map.Entry<Validation.FormInput, Validation.Message> entry : messages.entrySet()) {
+ JSONObject jsonEntry = new JSONObject();
+ jsonEntry.put(STATUS, entry.getValue().getStatus().name());
+ jsonEntry.put(MESSAGE, entry.getValue().getMessage());
+ jsonMessages.put(entry.getKey(), jsonEntry);
+ }
+
+ object.put(MESSAGES, jsonMessages);
+
+ return object;
+ }
+
+ @Override
+ public void restore(JSONObject jsonObject) {
+ // Optional and accepting NULLs
+ id = (Long) jsonObject.get(ID);
+
+ JSONObject jsonConnectorObject = (JSONObject)jsonObject.get(CONNECTOR);
+
+ connectorValidation.put(ConnectorType.FROM, restoreValidation(
+ (JSONObject)jsonConnectorObject.get(FROM)));
+ connectorValidation.put(ConnectorType.TO, restoreValidation(
+ (JSONObject)jsonConnectorObject.get(TO)));
+ frameworkValidation = restoreValidation(
+ (JSONObject)jsonObject.get(FRAMEWORK));
+ }
+
+ public Validation restoreValidation(JSONObject jsonObject) {
+ JSONObject jsonMessages = (JSONObject) jsonObject.get(MESSAGES);
+ Map<Validation.FormInput, Validation.Message> messages
+ = new HashMap<Validation.FormInput, Validation.Message>();
+
+ for(Object key : jsonMessages.keySet()) {
+ JSONObject jsonMessage = (JSONObject) jsonMessages.get(key);
+
+ Status status = Status.valueOf((String) jsonMessage.get(STATUS));
+ String stringMessage = (String) jsonMessage.get(MESSAGE);
+
+ Validation.Message message
+ = new Validation.Message(status, stringMessage);
+
+ messages.put(new Validation.FormInput((String)key), message);
+ }
+
+ Status status = Status.valueOf((String) jsonObject.get(STATUS));
+
+ return new Validation(status, messages);
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/dfd30036/common/src/main/java/org/apache/sqoop/json/ValidationBean.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/json/ValidationBean.java b/common/src/main/java/org/apache/sqoop/json/ValidationBean.java
deleted file mode 100644
index fd36825..0000000
--- a/common/src/main/java/org/apache/sqoop/json/ValidationBean.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.json;
-
-import org.apache.sqoop.validation.Status;
-import org.apache.sqoop.validation.Validation;
-import org.json.simple.JSONObject;
-
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Bean for sending validations across network. This bean will move two
- * validation objects at one time - one for connector and second for framework
- * part of validated entity. Optionally validation bean can also transfer
- * created persistent id in case that new entity was created.
- */
-public class ValidationBean implements JsonBean {
-
- private static final String ID = "id";
- private static final String FRAMEWORK = "framework";
- private static final String CONNECTOR = "connector";
- private static final String STATUS = "status";
- private static final String MESSAGE = "message";
- private static final String MESSAGES = "messages";
-
- private Long id;
- private Validation connectorValidation;
- private Validation frameworkValidation;
-
- // For "extract"
- public ValidationBean(Validation connector, Validation framework) {
- this();
-
- this.connectorValidation = connector;
- this.frameworkValidation = framework;
- }
-
- // For "restore"
- public ValidationBean() {
- id = null;
- }
-
- public Validation getConnectorValidation() {
- return connectorValidation;
- }
-
- public Validation getFrameworkValidation() {
- return frameworkValidation;
- }
-
- public void setId(Long id) {
- this.id = id;
- }
-
- public Long getId() {
- return id;
- }
-
- @SuppressWarnings("unchecked")
- public JSONObject extract(boolean skipSensitive) {
- JSONObject object = new JSONObject();
-
- // Optionally transfer id
- if(id != null) {
- object.put(ID, id);
- }
-
- object.put(CONNECTOR, extractValidation(connectorValidation));
- object.put(FRAMEWORK, extractValidation(frameworkValidation));
-
- return object;
- }
-
- @SuppressWarnings("unchecked")
- private JSONObject extractValidation(Validation validation) {
- JSONObject object = new JSONObject();
-
- object.put(STATUS, validation.getStatus().name());
-
- JSONObject jsonMessages = new JSONObject();
- Map<Validation.FormInput, Validation.Message> messages = validation.getMessages();
-
- for(Map.Entry<Validation.FormInput, Validation.Message> entry : messages.entrySet()) {
- JSONObject jsonEntry = new JSONObject();
- jsonEntry.put(STATUS, entry.getValue().getStatus().name());
- jsonEntry.put(MESSAGE, entry.getValue().getMessage());
- jsonMessages.put(entry.getKey(), jsonEntry);
- }
-
- object.put(MESSAGES, jsonMessages);
-
- return object;
- }
-
- @Override
- public void restore(JSONObject jsonObject) {
- // Optional and accepting NULLs
- id = (Long) jsonObject.get(ID);
-
- connectorValidation = restoreValidation(
- (JSONObject)jsonObject.get(CONNECTOR));
- frameworkValidation = restoreValidation(
- (JSONObject)jsonObject.get(FRAMEWORK));
- }
-
- public Validation restoreValidation(JSONObject jsonObject) {
- JSONObject jsonMessages = (JSONObject) jsonObject.get(MESSAGES);
- Map<Validation.FormInput, Validation.Message> messages
- = new HashMap<Validation.FormInput, Validation.Message>();
-
- for(Object key : jsonMessages.keySet()) {
- JSONObject jsonMessage = (JSONObject) jsonMessages.get(key);
-
- Status status = Status.valueOf((String) jsonMessage.get(STATUS));
- String stringMessage = (String) jsonMessage.get(MESSAGE);
-
- Validation.Message message
- = new Validation.Message(status, stringMessage);
-
- messages.put(new Validation.FormInput((String)key), message);
- }
-
- Status status = Status.valueOf((String) jsonObject.get(STATUS));
-
- return new Validation(status, messages);
- }
-}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/dfd30036/common/src/main/java/org/apache/sqoop/model/MConnector.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/model/MConnector.java b/common/src/main/java/org/apache/sqoop/model/MConnector.java
index 43fad27..a7518d2 100644
--- a/common/src/main/java/org/apache/sqoop/model/MConnector.java
+++ b/common/src/main/java/org/apache/sqoop/model/MConnector.java
@@ -17,8 +17,10 @@
*/
package org.apache.sqoop.model;
-import java.util.ArrayList;
-import java.util.List;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.sqoop.common.ConnectorType;
/**
* Connector metadata.
@@ -26,14 +28,23 @@ import java.util.List;
* Includes unique id that identifies connector in metadata store, unique human
* readable name, corresponding name and all forms for all supported job types.
*/
-public final class MConnector extends MFramework {
+public final class MConnector extends MPersistableEntity implements MClonable {
private final String uniqueName;
private final String className;
+ private final MConnectionForms connectionForms;
+ private final Map<ConnectorType, MJobForms> jobForms;
+ String version;
+
+ public MConnector(String uniqueName, String className,
+ String version, MConnectionForms connectionForms,
+ MJobForms fromJobForms, MJobForms toJobForms) {
+ this.jobForms = new HashMap<ConnectorType, MJobForms>();
- public MConnector(String uniqueName, String className, String version,
- MConnectionForms connectionForms, List<MJobForms> jobForms) {
- super(connectionForms, jobForms, version);
+ this.version = version;
+ this.connectionForms = connectionForms;
+ this.jobForms.put(ConnectorType.FROM, fromJobForms);
+ this.jobForms.put(ConnectorType.TO, toJobForms);
if (uniqueName == null || className == null) {
throw new NullPointerException();
@@ -57,10 +68,8 @@ public final class MConnector extends MFramework {
sb.append(uniqueName).append(":").append(getPersistenceId()).append(":");
sb.append(className);
sb.append(", ").append(getConnectionForms().toString());
- for(MJobForms entry: getAllJobsForms().values()) {
- sb.append(entry.toString());
- }
-
+ sb.append(", ").append(getJobForms(ConnectorType.FROM).toString());
+ sb.append(", ").append(getJobForms(ConnectorType.TO).toString());
return sb.toString();
}
@@ -78,32 +87,49 @@ public final class MConnector extends MFramework {
return uniqueName.equals(mc.uniqueName)
&& className.equals(mc.className)
&& version.equals(mc.version)
- && super.equals(other);
+ && connectionForms.equals(mc.getConnectionForms())
+ && jobForms.get(ConnectorType.FROM).equals(mc.getJobForms(ConnectorType.FROM))
+ && jobForms.get(ConnectorType.TO).equals(mc.getJobForms(ConnectorType.TO));
}
@Override
public int hashCode() {
- int result = super.hashCode();
+ int result = getConnectionForms().hashCode();
+ result = 31 * result + getJobForms(ConnectorType.FROM).hashCode();
+ result = 31 * result + getJobForms(ConnectorType.TO).hashCode();
+ result = 31 * result + version.hashCode();
result = 31 * result + uniqueName.hashCode();
result = 31 * result + className.hashCode();
-
return result;
}
- @Override
public MConnector clone(boolean cloneWithValue) {
//Connector never have any values filled
cloneWithValue = false;
- List<MJobForms> copyJobForms = null;
- if(this.getAllJobsForms()!=null) {
- copyJobForms = new ArrayList<MJobForms>();
- for(MJobForms entry: this.getAllJobsForms().values()) {
- copyJobForms.add(entry.clone(cloneWithValue));
- }
- }
- MConnector copy = new MConnector(this.getUniqueName(), this.getClassName(), this.getVersion(),
- this.getConnectionForms().clone(cloneWithValue), copyJobForms);
+ MConnector copy = new MConnector(
+ this.getUniqueName(),
+ this.getClassName(),
+ this.getVersion(),
+ this.getConnectionForms().clone(cloneWithValue),
+ this.getJobForms(ConnectorType.FROM).clone(cloneWithValue),
+ this.getJobForms(ConnectorType.TO).clone(cloneWithValue));
copy.setPersistenceId(this.getPersistenceId());
return copy;
}
+
+ public MConnectionForms getConnectionForms() {
+ return connectionForms;
+ }
+
+ public MJobForms getJobForms(ConnectorType type) {
+ return jobForms.get(type);
+ }
+
+ public String getVersion() {
+ return version;
+ }
+
+ public void setVersion(String version) {
+ this.version = version;
+ }
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/dfd30036/common/src/main/java/org/apache/sqoop/model/MFramework.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/model/MFramework.java b/common/src/main/java/org/apache/sqoop/model/MFramework.java
index c742459..580db9c 100644
--- a/common/src/main/java/org/apache/sqoop/model/MFramework.java
+++ b/common/src/main/java/org/apache/sqoop/model/MFramework.java
@@ -17,38 +17,21 @@
*/
package org.apache.sqoop.model;
-import org.apache.sqoop.common.SqoopException;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
/**
- * Metadata describing framework options for connection and job for each
- * supported job type.
+ * Metadata describing framework options for connection and jobForms for each
+ * supported jobForms type.
*/
public class MFramework extends MPersistableEntity implements MClonable {
private final MConnectionForms connectionForms;
- private final Map<MJob.Type, MJobForms> jobs;
+ private final MJobForms jobForms;
String version;
- public MFramework(MConnectionForms connectionForms, List<MJobForms> jobForms,
+ public MFramework(MConnectionForms connectionForms, MJobForms jobForms,
String version) {
this.version = version;
this.connectionForms = connectionForms;
- this.jobs = new HashMap<MJob.Type, MJobForms>();
-
- for (MJobForms job : jobForms) {
- MJob.Type type = job.getType();
-
- if(this.jobs.containsKey(type)) {
- throw new SqoopException(ModelError.MODEL_001, "Duplicate entry for"
- + " jobForms type " + job.getType().name());
- }
- this.jobs.put(type, job);
- }
+ this.jobForms = jobForms;
}
@Override
@@ -57,9 +40,7 @@ public class MFramework extends MPersistableEntity implements MClonable {
sb.append(getPersistenceId()).append(":");
sb.append("version = " + version);
sb.append(", ").append(connectionForms.toString());
- for(MJobForms entry: jobs.values()) {
- sb.append(entry.toString());
- }
+ sb.append(jobForms.toString());
return sb.toString();
}
@@ -77,16 +58,13 @@ public class MFramework extends MPersistableEntity implements MClonable {
MFramework mo = (MFramework) other;
return version.equals(mo.getVersion()) &&
connectionForms.equals(mo.connectionForms) &&
- jobs.equals(mo.jobs);
+ jobForms.equals(mo.jobForms);
}
@Override
public int hashCode() {
int result = connectionForms.hashCode();
-
- for(MJobForms entry: jobs.values()) {
- result = 31 * result + entry.hashCode();
- }
+ result = 31 * result + jobForms.hashCode();
result = 31 * result + version.hashCode();
return result;
}
@@ -95,27 +73,16 @@ public class MFramework extends MPersistableEntity implements MClonable {
return connectionForms;
}
- public Map<MJob.Type, MJobForms> getAllJobsForms() {
- return jobs;
- }
-
- public MJobForms getJobForms(MJob.Type type) {
- return jobs.get(type);
+ public MJobForms getJobForms() {
+ return jobForms;
}
@Override
public MFramework clone(boolean cloneWithValue) {
//Framework never have any values filled
cloneWithValue = false;
- List<MJobForms> copyJobForms = null;
- if(this.getAllJobsForms()!=null) {
- copyJobForms = new ArrayList<MJobForms>();
- for(MJobForms entry: this.getAllJobsForms().values()) {
- copyJobForms.add(entry.clone(cloneWithValue));
- }
- }
MFramework copy = new MFramework(this.getConnectionForms().clone(cloneWithValue),
- copyJobForms, this.version);
+ this.getJobForms().clone(cloneWithValue), this.version);
copy.setPersistenceId(this.getPersistenceId());
return copy;
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/dfd30036/common/src/main/java/org/apache/sqoop/model/MJob.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/model/MJob.java b/common/src/main/java/org/apache/sqoop/model/MJob.java
index 849168d..6802a74 100644
--- a/common/src/main/java/org/apache/sqoop/model/MJob.java
+++ b/common/src/main/java/org/apache/sqoop/model/MJob.java
@@ -17,19 +17,16 @@
*/
package org.apache.sqoop.model;
-import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.common.ConnectorType;
+
+import java.util.HashMap;
+import java.util.Map;
/**
* Model describing entire job object including both connector and
* framework part.
*/
public class MJob extends MAccountableEntity implements MClonable {
-
- public static enum Type {
- IMPORT,
- EXPORT,
- }
-
/**
* Connector reference.
*
@@ -37,46 +34,47 @@ public class MJob extends MAccountableEntity implements MClonable {
* dependency through connection object, but having this dependency explicitly
* carried along helps a lot.
*/
- private final long connectorId;
+ private final Map<ConnectorType, Long> connectorIds;
/**
- * Corresponding connection object.
+ * Corresponding connection objects for connector.
*/
- private final long connectionId;
+ private final Map<ConnectorType, Long> connectionIds;
/**
* User name for this object
*/
private String name;
- /**
- * Job type
- */
- private final Type type;
-
- private final MJobForms connectorPart;
+ private final Map<ConnectorType, MJobForms> connectorParts;
private final MJobForms frameworkPart;
/**
* Default constructor to build new MJob model.
*
- * @param connectorId Connector id
- * @param connectionId Connection id
- * @param type Job type
- * @param connectorPart Connector forms
+ * @param fromConnectorId Connector id
+ * @param fromConnectionId Connection id
+ * @param fromPart From Connector forms
+ * @param toPart To Connector forms
* @param frameworkPart Framework forms
*/
- public MJob(long connectorId,
- long connectionId,
- Type type,
- MJobForms connectorPart,
+ public MJob(long fromConnectorId,
+ long toConnectorId,
+ long fromConnectionId,
+ long toConnectionId,
+ MJobForms fromPart,
+ MJobForms toPart,
MJobForms frameworkPart) {
- this.connectorId = connectorId;
- this.connectionId = connectionId;
- this.type = type;
- this.connectorPart = connectorPart;
+ connectorIds = new HashMap<ConnectorType, Long>();
+ connectorIds.put(ConnectorType.FROM, fromConnectorId);
+ connectorIds.put(ConnectorType.TO, toConnectorId);
+ connectionIds = new HashMap<ConnectorType, Long>();
+ connectionIds.put(ConnectorType.FROM, fromConnectionId);
+ connectionIds.put(ConnectorType.TO, toConnectionId);
+ connectorParts = new HashMap<ConnectorType, MJobForms>();
+ connectorParts.put(ConnectorType.FROM, fromPart);
+ connectorParts.put(ConnectorType.TO, toPart);
this.frameworkPart = frameworkPart;
- verifyFormsOfSameType();
}
/**
@@ -85,7 +83,10 @@ public class MJob extends MAccountableEntity implements MClonable {
* @param other MConnection model to copy
*/
public MJob(MJob other) {
- this(other, other.connectorPart.clone(true), other.frameworkPart.clone(true));
+ this(other,
+ other.getConnectorPart(ConnectorType.FROM).clone(true),
+ other.getConnectorPart(ConnectorType.TO).clone(true),
+ other.frameworkPart.clone(true));
}
/**
@@ -95,34 +96,31 @@ public class MJob extends MAccountableEntity implements MClonable {
* used otherwise.
*
* @param other MJob model to copy
- * @param connectorPart Connector forms
+ * @param fromPart From Connector forms
* @param frameworkPart Framework forms
+ * @param toPart To Connector forms
*/
- public MJob(MJob other, MJobForms connectorPart, MJobForms frameworkPart) {
+ public MJob(MJob other, MJobForms fromPart, MJobForms frameworkPart, MJobForms toPart) {
super(other);
- this.connectionId = other.connectionId;
- this.connectorId = other.connectorId;
- this.type = other.type;
+ connectorIds = new HashMap<ConnectorType, Long>();
+ connectorIds.put(ConnectorType.FROM, other.getConnectorId(ConnectorType.FROM));
+ connectorIds.put(ConnectorType.TO, other.getConnectorId(ConnectorType.TO));
+ connectionIds = new HashMap<ConnectorType, Long>();
+ connectorIds.put(ConnectorType.FROM, other.getConnectionId(ConnectorType.FROM));
+ connectorIds.put(ConnectorType.TO, other.getConnectionId(ConnectorType.TO));
+ connectorParts = new HashMap<ConnectorType, MJobForms>();
+ connectorParts.put(ConnectorType.FROM, fromPart);
+ connectorParts.put(ConnectorType.TO, toPart);
this.name = other.name;
- this.connectorPart = connectorPart;
this.frameworkPart = frameworkPart;
- verifyFormsOfSameType();
- }
-
- private void verifyFormsOfSameType() {
- if (type != connectorPart.getType() || type != frameworkPart.getType()) {
- throw new SqoopException(ModelError.MODEL_002,
- "Incompatible types, job: " + type.name()
- + ", connector part: " + connectorPart.getType().name()
- + ", framework part: " + frameworkPart.getType().name()
- );
- }
}
@Override
public String toString() {
- StringBuilder sb = new StringBuilder("job connector-part: ");
- sb.append(connectorPart).append(", framework-part: ").append(frameworkPart);
+ StringBuilder sb = new StringBuilder("job");
+ sb.append(" connector-from-part: ").append(getConnectorPart(ConnectorType.FROM));
+ sb.append(", connector-to-part: ").append(getConnectorPart(ConnectorType.TO));
+ sb.append(", framework-part: ").append(frameworkPart);
return sb.toString();
}
@@ -135,32 +133,35 @@ public class MJob extends MAccountableEntity implements MClonable {
this.name = name;
}
- public long getConnectionId() {
- return connectionId;
+ public long getConnectionId(ConnectorType type) {
+ return connectionIds.get(type);
}
- public long getConnectorId() {
- return connectorId;
+ public long getConnectorId(ConnectorType type) {
+ return connectorIds.get(type);
}
- public MJobForms getConnectorPart() {
- return connectorPart;
+ public MJobForms getConnectorPart(ConnectorType type) {
+ return connectorParts.get(type);
}
public MJobForms getFrameworkPart() {
return frameworkPart;
}
- public Type getType() {
- return type;
- }
-
@Override
public MJob clone(boolean cloneWithValue) {
if(cloneWithValue) {
return new MJob(this);
} else {
- return new MJob(connectorId, connectionId, type, connectorPart.clone(false), frameworkPart.clone(false));
+ return new MJob(
+ getConnectorId(ConnectorType.FROM),
+ getConnectorId(ConnectorType.TO),
+ getConnectionId(ConnectorType.FROM),
+ getConnectionId(ConnectorType.TO),
+ getConnectorPart(ConnectorType.FROM).clone(false),
+ getConnectorPart(ConnectorType.TO).clone(false),
+ frameworkPart.clone(false));
}
}
@@ -175,11 +176,13 @@ public class MJob extends MAccountableEntity implements MClonable {
}
MJob job = (MJob)object;
- return (job.connectorId == this.connectorId)
- && (job.connectionId == this.connectionId)
+ return (job.getConnectorId(ConnectorType.FROM) == this.getConnectorId(ConnectorType.FROM))
+ && (job.getConnectorId(ConnectorType.TO) == this.getConnectorId(ConnectorType.TO))
+ && (job.getConnectionId(ConnectorType.FROM) == this.getConnectionId(ConnectorType.FROM))
+ && (job.getConnectionId(ConnectorType.TO) == this.getConnectionId(ConnectorType.TO))
&& (job.getPersistenceId() == this.getPersistenceId())
- && (job.type.equals(this.type))
- && (job.connectorPart.equals(this.connectorPart))
+ && (job.getConnectorPart(ConnectorType.FROM).equals(this.getConnectorPart(ConnectorType.FROM)))
+ && (job.getConnectorPart(ConnectorType.TO).equals(this.getConnectorPart(ConnectorType.TO)))
&& (job.frameworkPart.equals(this.frameworkPart));
}
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/dfd30036/common/src/main/java/org/apache/sqoop/model/MJobForms.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/model/MJobForms.java b/common/src/main/java/org/apache/sqoop/model/MJobForms.java
index f697023..08b9a78 100644
--- a/common/src/main/java/org/apache/sqoop/model/MJobForms.java
+++ b/common/src/main/java/org/apache/sqoop/model/MJobForms.java
@@ -20,28 +20,12 @@ package org.apache.sqoop.model;
import java.util.List;
/**
- * Metadata describing all required information to build up an job
- * object for one part. Both connector and framework need to supply this object
- * to build up entire job.
+ * Metadata describing all required information to build a job
+ * object with two connectors and a framework.
*/
public class MJobForms extends MFormList {
-
- private final MJob.Type type;
-
- public MJobForms(MJob.Type type, List<MForm> forms) {
+ public MJobForms(List<MForm> forms) {
super(forms);
- this.type = type;
- }
-
- @Override
- public String toString() {
- StringBuilder sb = new StringBuilder("Job type: ").append(type.name());
- sb.append(super.toString());
- return sb.toString();
- }
-
- public MJob.Type getType() {
- return type;
}
@Override
@@ -55,19 +39,17 @@ public class MJobForms extends MFormList {
}
MJobForms mj = (MJobForms) other;
- return type.equals(mj.type) && super.equals(mj);
+ return super.equals(mj);
}
@Override
public int hashCode() {
- int result = super.hashCode();
- result = 31 * result + type.hashCode();
- return result;
+ return super.hashCode();
}
@Override
public MJobForms clone(boolean cloneWithValue) {
- MJobForms copy = new MJobForms(this.type, super.clone(cloneWithValue).getForms());
+ MJobForms copy = new MJobForms(super.clone(cloneWithValue).getForms());
return copy;
}
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/dfd30036/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnector.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnector.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnector.java
index e0da80f..442e588 100644
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnector.java
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnector.java
@@ -20,31 +20,31 @@ package org.apache.sqoop.connector.jdbc;
import java.util.Locale;
import java.util.ResourceBundle;
+import org.apache.sqoop.common.ConnectorType;
import org.apache.sqoop.common.VersionInfo;
import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration;
-import org.apache.sqoop.connector.jdbc.configuration.ExportJobConfiguration;
-import org.apache.sqoop.connector.jdbc.configuration.ImportJobConfiguration;
+import org.apache.sqoop.connector.jdbc.configuration.FromJobConfiguration;
+import org.apache.sqoop.connector.jdbc.configuration.ToJobConfiguration;
import org.apache.sqoop.connector.spi.MetadataUpgrader;
-import org.apache.sqoop.job.etl.Exporter;
-import org.apache.sqoop.job.etl.Importer;
+import org.apache.sqoop.job.etl.From;
+import org.apache.sqoop.job.etl.To;
import org.apache.sqoop.connector.spi.SqoopConnector;
-import org.apache.sqoop.model.MJob;
import org.apache.sqoop.validation.Validator;
public class GenericJdbcConnector extends SqoopConnector {
private static GenericJdbcValidator genericJdbcValidator = new GenericJdbcValidator();
- private static final Importer IMPORTER = new Importer(
- GenericJdbcImportInitializer.class,
- GenericJdbcImportPartitioner.class,
- GenericJdbcImportExtractor.class,
- GenericJdbcImportDestroyer.class);
+ private static final From FROM = new From(
+ GenericJdbcFromInitializer.class,
+ GenericJdbcPartitioner.class,
+ GenericJdbcExtractor.class,
+ GenericJdbcFromDestroyer.class);
- private static final Exporter EXPORTER = new Exporter(
- GenericJdbcExportInitializer.class,
- GenericJdbcExportLoader.class,
- GenericJdbcExportDestroyer.class);
+ private static final To TO = new To(
+ GenericJdbcToInitializer.class,
+ GenericJdbcLoader.class,
+ GenericJdbcToDestroyer.class);
/**
@@ -70,25 +70,25 @@ public class GenericJdbcConnector extends SqoopConnector {
}
@Override
- public Class getJobConfigurationClass(MJob.Type jobType) {
+ public Class getJobConfigurationClass(ConnectorType jobType) {
switch (jobType) {
- case IMPORT:
- return ImportJobConfiguration.class;
- case EXPORT:
- return ExportJobConfiguration.class;
+ case FROM:
+ return FromJobConfiguration.class;
+ case TO:
+ return ToJobConfiguration.class;
default:
return null;
}
}
@Override
- public Importer getImporter() {
- return IMPORTER;
+ public From getFrom() {
+ return FROM;
}
@Override
- public Exporter getExporter() {
- return EXPORTER;
+ public To getTo() {
+ return TO;
}
@Override
http://git-wip-us.apache.org/repos/asf/sqoop/blob/dfd30036/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorConstants.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorConstants.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorConstants.java
index abcc89d..a51fb7d 100644
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorConstants.java
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorConstants.java
@@ -42,8 +42,10 @@ public final class GenericJdbcConnectorConstants {
public static final String CONNECTOR_JDBC_PARTITION_MAXVALUE =
PREFIX_CONNECTOR_JDBC_CONFIG + "partition.maxvalue";
- public static final String CONNECTOR_JDBC_DATA_SQL =
- PREFIX_CONNECTOR_JDBC_CONFIG + "data.sql";
+ public static final String CONNECTOR_FROM_JDBC_DATA_SQL =
+ PREFIX_CONNECTOR_JDBC_CONFIG + "from.data.sql";
+ public static final String CONNECTOR_TO_JDBC_DATA_SQL =
+ PREFIX_CONNECTOR_JDBC_CONFIG + "to.data.sql";
public static final String SQL_CONDITIONS_TOKEN = "${CONDITIONS}";
http://git-wip-us.apache.org/repos/asf/sqoop/blob/dfd30036/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportDestroyer.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportDestroyer.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportDestroyer.java
deleted file mode 100644
index c5faa09..0000000
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportDestroyer.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.connector.jdbc;
-
-import org.apache.log4j.Logger;
-import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration;
-import org.apache.sqoop.connector.jdbc.configuration.ExportJobConfiguration;
-import org.apache.sqoop.job.etl.Destroyer;
-import org.apache.sqoop.job.etl.DestroyerContext;
-
-public class GenericJdbcExportDestroyer extends Destroyer<ConnectionConfiguration, ExportJobConfiguration> {
-
- private static final Logger LOG = Logger.getLogger(GenericJdbcExportDestroyer.class);
-
- @Override
- public void destroy(DestroyerContext context, ConnectionConfiguration connection, ExportJobConfiguration job) {
- LOG.info("Running generic JDBC connector destroyer");
-
- final String tableName = job.table.tableName;
- final String stageTableName = job.table.stageTableName;
- final boolean stageEnabled = stageTableName != null &&
- stageTableName.length() > 0;
- if(stageEnabled) {
- moveDataToDestinationTable(connection,
- context.isSuccess(), stageTableName, tableName);
- }
- }
-
- private void moveDataToDestinationTable(ConnectionConfiguration connectorConf,
- boolean success, String stageTableName, String tableName) {
- GenericJdbcExecutor executor =
- new GenericJdbcExecutor(connectorConf.connection.jdbcDriver,
- connectorConf.connection.connectionString,
- connectorConf.connection.username,
- connectorConf.connection.password);
-
- if(success) {
- LOG.info("Job completed, transferring data from stage table to " +
- "destination table.");
- executor.migrateData(stageTableName, tableName);
- } else {
- LOG.warn("Job failed, clearing stage table.");
- executor.deleteTableData(stageTableName);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/dfd30036/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportInitializer.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportInitializer.java
deleted file mode 100644
index ef39cdc..0000000
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportInitializer.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.connector.jdbc;
-
-import java.util.LinkedList;
-import java.util.List;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.log4j.Logger;
-import org.apache.sqoop.common.MutableContext;
-import org.apache.sqoop.common.SqoopException;
-import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration;
-import org.apache.sqoop.connector.jdbc.configuration.ExportJobConfiguration;
-import org.apache.sqoop.job.etl.Initializer;
-import org.apache.sqoop.job.etl.InitializerContext;
-import org.apache.sqoop.schema.Schema;
-import org.apache.sqoop.utils.ClassUtils;
-
-public class GenericJdbcExportInitializer extends Initializer<ConnectionConfiguration, ExportJobConfiguration> {
-
- private GenericJdbcExecutor executor;
- private static final Logger LOG =
- Logger.getLogger(GenericJdbcExportInitializer.class);
-
- @Override
- public void initialize(InitializerContext context, ConnectionConfiguration connection, ExportJobConfiguration job) {
- configureJdbcProperties(context.getContext(), connection, job);
- try {
- configureTableProperties(context.getContext(), connection, job);
- } finally {
- executor.close();
- }
- }
-
- @Override
- public List<String> getJars(InitializerContext context, ConnectionConfiguration connection, ExportJobConfiguration job) {
- List<String> jars = new LinkedList<String>();
-
- jars.add(ClassUtils.jarForClass(connection.connection.jdbcDriver));
-
- return jars;
- }
-
- @Override
- public Schema getSchema(InitializerContext context, ConnectionConfiguration connectionConfiguration, ExportJobConfiguration exportJobConfiguration) {
- return null;
- }
-
- private void configureJdbcProperties(MutableContext context, ConnectionConfiguration connectionConfig, ExportJobConfiguration jobConfig) {
- String driver = connectionConfig.connection.jdbcDriver;
- String url = connectionConfig.connection.connectionString;
- String username = connectionConfig.connection.username;
- String password = connectionConfig.connection.password;
-
- assert driver != null;
- assert url != null;
-
- executor = new GenericJdbcExecutor(driver, url, username, password);
- }
-
- private void configureTableProperties(MutableContext context, ConnectionConfiguration connectionConfig, ExportJobConfiguration jobConfig) {
- String dataSql;
-
- String schemaName = jobConfig.table.schemaName;
- String tableName = jobConfig.table.tableName;
- String stageTableName = jobConfig.table.stageTableName;
- boolean clearStageTable = jobConfig.table.clearStageTable == null ?
- false : jobConfig.table.clearStageTable;
- final boolean stageEnabled =
- stageTableName != null && stageTableName.length() > 0;
- String tableSql = jobConfig.table.sql;
- String tableColumns = jobConfig.table.columns;
-
- if (tableName != null && tableSql != null) {
- // when both table name and table sql are specified:
- throw new SqoopException(
- GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0007);
-
- } else if (tableName != null) {
- // when table name is specified:
- if(stageEnabled) {
- LOG.info("Stage has been enabled.");
- LOG.info("Use stageTable: " + stageTableName +
- " with clearStageTable: " + clearStageTable);
-
- if(clearStageTable) {
- executor.deleteTableData(stageTableName);
- } else {
- long stageRowCount = executor.getTableRowCount(stageTableName);
- if(stageRowCount > 0) {
- throw new SqoopException(
- GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0017);
- }
- }
- }
-
- // For databases that support schemas (IE: postgresql).
- final String tableInUse = stageEnabled ? stageTableName : tableName;
- String fullTableName = (schemaName == null) ?
- executor.delimitIdentifier(tableInUse) :
- executor.delimitIdentifier(schemaName) +
- "." + executor.delimitIdentifier(tableInUse);
-
- if (tableColumns == null) {
- String[] columns = executor.getQueryColumns("SELECT * FROM "
- + fullTableName + " WHERE 1 = 0");
- StringBuilder builder = new StringBuilder();
- builder.append("INSERT INTO ");
- builder.append(fullTableName);
- builder.append(" VALUES (?");
- for (int i = 1; i < columns.length; i++) {
- builder.append(",?");
- }
- builder.append(")");
- dataSql = builder.toString();
-
- } else {
- String[] columns = StringUtils.split(tableColumns, ',');
- StringBuilder builder = new StringBuilder();
- builder.append("INSERT INTO ");
- builder.append(fullTableName);
- builder.append(" (");
- builder.append(tableColumns);
- builder.append(") VALUES (?");
- for (int i = 1; i < columns.length; i++) {
- builder.append(",?");
- }
- builder.append(")");
- dataSql = builder.toString();
- }
- } else if (tableSql != null) {
- // when table sql is specified:
-
- if (tableSql.indexOf(
- GenericJdbcConnectorConstants.SQL_PARAMETER_MARKER) == -1) {
- // make sure parameter marker is in the specified sql
- throw new SqoopException(
- GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0013);
- }
-
- if (tableColumns == null) {
- dataSql = tableSql;
- } else {
- throw new SqoopException(
- GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0014);
- }
- } else {
- // when neither are specified:
- throw new SqoopException(
- GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0008);
- }
-
- context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_DATA_SQL,
- dataSql.toString());
- }
-}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/dfd30036/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportLoader.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportLoader.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportLoader.java
deleted file mode 100644
index 15e7101..0000000
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportLoader.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.connector.jdbc;
-
-import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration;
-import org.apache.sqoop.connector.jdbc.configuration.ExportJobConfiguration;
-import org.apache.sqoop.job.etl.Loader;
-import org.apache.sqoop.job.etl.LoaderContext;
-
-public class GenericJdbcExportLoader extends Loader<ConnectionConfiguration, ExportJobConfiguration> {
-
- public static final int DEFAULT_ROWS_PER_BATCH = 100;
- public static final int DEFAULT_BATCHES_PER_TRANSACTION = 100;
- private int rowsPerBatch = DEFAULT_ROWS_PER_BATCH;
- private int batchesPerTransaction = DEFAULT_BATCHES_PER_TRANSACTION;
-
- @Override
- public void load(LoaderContext context, ConnectionConfiguration connection, ExportJobConfiguration job) throws Exception{
- String driver = connection.connection.jdbcDriver;
- String url = connection.connection.connectionString;
- String username = connection.connection.username;
- String password = connection.connection.password;
- GenericJdbcExecutor executor = new GenericJdbcExecutor(driver, url, username, password);
- executor.setAutoCommit(false);
-
- String sql = context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_DATA_SQL);
- executor.beginBatch(sql);
- try {
- int numberOfRows = 0;
- int numberOfBatches = 0;
- Object[] array;
-
- while ((array = context.getDataReader().readArrayRecord()) != null) {
- numberOfRows++;
- executor.addBatch(array);
-
- if (numberOfRows == rowsPerBatch) {
- numberOfBatches++;
- if (numberOfBatches == batchesPerTransaction) {
- executor.executeBatch(true);
- numberOfBatches = 0;
- } else {
- executor.executeBatch(false);
- }
- numberOfRows = 0;
- }
- }
-
- if (numberOfRows != 0 || numberOfBatches != 0) {
- // execute and commit the remaining rows
- executor.executeBatch(true);
- }
-
- executor.endBatch();
-
- } finally {
- executor.close();
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/dfd30036/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExtractor.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExtractor.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExtractor.java
new file mode 100644
index 0000000..2428199
--- /dev/null
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExtractor.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.jdbc;
+
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+
+import org.apache.log4j.Logger;
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration;
+import org.apache.sqoop.connector.jdbc.configuration.FromJobConfiguration;
+import org.apache.sqoop.job.etl.ExtractorContext;
+import org.apache.sqoop.job.etl.Extractor;
+
+public class GenericJdbcExtractor extends Extractor<ConnectionConfiguration, FromJobConfiguration, GenericJdbcPartition> {
+
+ public static final Logger LOG = Logger.getLogger(GenericJdbcExtractor.class);
+
+ private long rowsRead = 0;
+ @Override
+ public void extract(ExtractorContext context, ConnectionConfiguration connection, FromJobConfiguration job, GenericJdbcPartition partition) {
+ String driver = connection.connection.jdbcDriver;
+ String url = connection.connection.connectionString;
+ String username = connection.connection.username;
+ String password = connection.connection.password;
+ GenericJdbcExecutor executor = new GenericJdbcExecutor(driver, url, username, password);
+
+ String query = context.getString(GenericJdbcConnectorConstants.CONNECTOR_FROM_JDBC_DATA_SQL);
+ String conditions = partition.getConditions();
+ query = query.replace(GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN, conditions);
+ LOG.info("Using query: " + query);
+
+ rowsRead = 0;
+ ResultSet resultSet = executor.executeQuery(query);
+
+ try {
+ ResultSetMetaData metaData = resultSet.getMetaData();
+ int column = metaData.getColumnCount();
+ while (resultSet.next()) {
+ Object[] array = new Object[column];
+ for (int i = 0; i< column; i++) {
+ array[i] = resultSet.getObject(i + 1) == null ? GenericJdbcConnectorConstants.SQL_NULL_VALUE
+ : resultSet.getObject(i + 1);
+ }
+ context.getDataWriter().writeArrayRecord(array);
+ rowsRead++;
+ }
+ } catch (SQLException e) {
+ throw new SqoopException(
+ GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0004, e);
+
+ } finally {
+ executor.close();
+ }
+ }
+
+ @Override
+ public long getRowsRead() {
+ return rowsRead;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/dfd30036/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcFromDestroyer.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcFromDestroyer.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcFromDestroyer.java
new file mode 100644
index 0000000..2df193c
--- /dev/null
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcFromDestroyer.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.jdbc;
+
+import org.apache.log4j.Logger;
+import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration;
+import org.apache.sqoop.connector.jdbc.configuration.FromJobConfiguration;
+import org.apache.sqoop.job.etl.Destroyer;
+import org.apache.sqoop.job.etl.DestroyerContext;
+
+public class GenericJdbcFromDestroyer extends Destroyer<ConnectionConfiguration, FromJobConfiguration> {
+
+ private static final Logger LOG =
+ Logger.getLogger(GenericJdbcFromDestroyer.class);
+
+ @Override
+ public void destroy(DestroyerContext context, ConnectionConfiguration connection, FromJobConfiguration job) {
+ LOG.info("Running generic JDBC connector destroyer");
+ }
+
+}