You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by bl...@apache.org on 2012/09/21 22:05:31 UTC
[1/2] git commit: SQOOP-608 Implement job resource from end to end
Updated Branches:
refs/heads/sqoop2 3a88f0b48 -> 866d46dfe
SQOOP-608 Implement job resource from end to end
Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/866d46df
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/866d46df
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/866d46df
Branch: refs/heads/sqoop2
Commit: 866d46dfe708d4fde03fc0af9e27a63fa303f082
Parents: 3a88f0b
Author: Bilung Lee <bl...@apache.org>
Authored: Thu Sep 20 14:47:31 2012 -0700
Committer: Bilung Lee <bl...@apache.org>
Committed: Thu Sep 20 14:47:31 2012 -0700
----------------------------------------------------------------------
.gitignore | 3 +
.../apache/sqoop/client/request/JobRequest.java | 81 ++++
.../apache/sqoop/client/shell/CreateCommand.java | 7 +-
.../client/shell/CreateConnectionFunction.java | 4 +-
.../sqoop/client/shell/CreateJobFunction.java | 192 ++++++++++
.../apache/sqoop/client/shell/DeleteCommand.java | 7 +-
.../sqoop/client/shell/DeleteJobFunction.java | 67 ++++
.../org/apache/sqoop/client/shell/ShowCommand.java | 7 +-
.../apache/sqoop/client/shell/ShowJobFunction.java | 107 ++++++
.../apache/sqoop/client/shell/UpdateCommand.java | 9 +-
.../client/shell/UpdateConnectionFunction.java | 6 +-
.../sqoop/client/shell/UpdateJobFunction.java | 136 +++++++
.../org/apache/sqoop/client/utils/FormFiller.java | 6 +-
.../main/java/org/apache/sqoop/json/JobBean.java | 186 +++++++++
.../java/org/apache/sqoop/json/ValidationBean.java | 41 ++-
.../src/main/java/org/apache/sqoop/model/MJob.java | 78 ++++-
.../java/org/apache/sqoop/model/ModelError.java | 6 +-
.../java/org/apache/sqoop/json/TestJobBean.java | 68 ++++
.../test/java/org/apache/sqoop/json/TestUtil.java | 19 +-
.../sqoop/connector/jdbc/GenericJdbcValidator.java | 4 +
.../apache/sqoop/repository/JdbcRepository.java | 87 +++++
.../sqoop/repository/JdbcRepositoryHandler.java | 55 +++
.../org/apache/sqoop/repository/Repository.java | 41 ++-
.../apache/sqoop/repository/RepositoryError.java | 10 +
.../sqoop/repository/derby/DerbyRepoError.java | 23 +-
.../repository/derby/DerbyRepositoryHandler.java | 295 ++++++++++++++-
.../repository/derby/DerbySchemaConstants.java | 2 +
.../sqoop/repository/derby/DerbySchemaQuery.java | 68 +++-
.../apache/sqoop/handler/JobRequestHandler.java | 212 +++++++++++
.../org/apache/sqoop/server/v1/JobServlet.java | 56 +++
server/src/main/webapp/WEB-INF/web.xml | 12 +
.../org/apache/sqoop/validation/Validator.java | 3 +-
32 files changed, 1831 insertions(+), 67 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/sqoop/blob/866d46df/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index beef00d..34ffe18 100644
--- a/.gitignore
+++ b/.gitignore
@@ -2,3 +2,6 @@
.project
.settings
target
+*.iml
+*.log
+.idea
http://git-wip-us.apache.org/repos/asf/sqoop/blob/866d46df/client/src/main/java/org/apache/sqoop/client/request/JobRequest.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/sqoop/client/request/JobRequest.java b/client/src/main/java/org/apache/sqoop/client/request/JobRequest.java
new file mode 100644
index 0000000..b93e407
--- /dev/null
+++ b/client/src/main/java/org/apache/sqoop/client/request/JobRequest.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.client.request;
+
+import org.apache.sqoop.json.JobBean;
+import org.apache.sqoop.json.ValidationBean;
+import org.apache.sqoop.model.MJob;
+import org.apache.sqoop.validation.Status;
+import org.json.simple.JSONObject;
+import org.json.simple.JSONValue;
+
+/**
+ * Provide CRUD semantics over RESTfull HTTP API for jobs. All operations
+ * are normally supported.
+ */
+public class JobRequest extends Request {
+
+ public static final String RESOURCE = "v1/job/";
+
+ public JobBean read(String serverUrl, String xid) {
+ String response = null;
+ if (xid == null) {
+ response = super.get(serverUrl + RESOURCE + "all");
+ } else {
+ response = super.get(serverUrl + RESOURCE + xid);
+ }
+ JSONObject jsonObject = (JSONObject) JSONValue.parse(response);
+
+ JobBean jobBean = new JobBean();
+ jobBean.restore(jsonObject);
+
+ return jobBean;
+ }
+
+ public Status create(String serverUrl, MJob job) {
+
+ JobBean jobBean = new JobBean(job);
+ JSONObject jobJson = jobBean.extract();
+
+ String response = super.post(serverUrl + RESOURCE,
+ jobJson.toJSONString());
+
+ ValidationBean validationBean = new ValidationBean(job);
+ validationBean.restore((JSONObject) JSONValue.parse(response));
+
+ return validationBean.getStatus();
+ }
+
+ public Status update(String serverUrl, MJob job) {
+
+ JobBean jobBean = new JobBean(job);
+ JSONObject jobJson = jobBean.extract();
+
+ String response = super.put(serverUrl + RESOURCE + job.getPersistenceId(),
+ jobJson.toJSONString());
+
+ ValidationBean validationBean = new ValidationBean(job);
+ validationBean.restore((JSONObject) JSONValue.parse(response));
+
+ return validationBean.getStatus();
+ }
+
+ public void delete(String serverUrl, long id) {
+ super.delete(serverUrl + RESOURCE + id);
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/866d46df/client/src/main/java/org/apache/sqoop/client/shell/CreateCommand.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/sqoop/client/shell/CreateCommand.java b/client/src/main/java/org/apache/sqoop/client/shell/CreateCommand.java
index bf4b581..2453543 100644
--- a/client/src/main/java/org/apache/sqoop/client/shell/CreateCommand.java
+++ b/client/src/main/java/org/apache/sqoop/client/shell/CreateCommand.java
@@ -29,6 +29,7 @@ import java.util.List;
public class CreateCommand extends SqoopCommand {
private CreateConnectionFunction connectionFunction;
+ private CreateJobFunction jobFunction;
public CreateCommand(Shell shell) {
super(shell, "create", "\\cr",
@@ -49,7 +50,11 @@ public class CreateCommand extends SqoopCommand {
connectionFunction = new CreateConnectionFunction(io);
}
return connectionFunction.execute(args);
-
+ } else if (func.equals("job")) {
+ if (jobFunction == null) {
+ jobFunction = new CreateJobFunction(io);
+ }
+ return jobFunction.execute(args);
} else {
String msg = "Usage: create " + getUsage();
throw new SqoopException(ClientError.CLIENT_0002, msg);
http://git-wip-us.apache.org/repos/asf/sqoop/blob/866d46df/client/src/main/java/org/apache/sqoop/client/shell/CreateConnectionFunction.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/sqoop/client/shell/CreateConnectionFunction.java b/client/src/main/java/org/apache/sqoop/client/shell/CreateConnectionFunction.java
index aef4b92..904624c 100644
--- a/client/src/main/java/org/apache/sqoop/client/shell/CreateConnectionFunction.java
+++ b/client/src/main/java/org/apache/sqoop/client/shell/CreateConnectionFunction.java
@@ -111,13 +111,13 @@ public class CreateConnectionFunction extends SqoopFunction {
}
// Query connector forms
- if(!fillForms(io, connection.getConnectorPart(),
+ if(!fillForms(io, connection.getConnectorPart().getForms(),
reader, connectorBundle)) {
return;
}
// Query framework forms
- if(!fillForms(io, connection.getFrameworkPart(),
+ if(!fillForms(io, connection.getFrameworkPart().getForms(),
reader, frameworkBundle)) {
return;
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/866d46df/client/src/main/java/org/apache/sqoop/client/shell/CreateJobFunction.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/sqoop/client/shell/CreateJobFunction.java b/client/src/main/java/org/apache/sqoop/client/shell/CreateJobFunction.java
new file mode 100644
index 0000000..1a8b901
--- /dev/null
+++ b/client/src/main/java/org/apache/sqoop/client/shell/CreateJobFunction.java
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.client.shell;
+
+import jline.ConsoleReader;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.sqoop.client.core.ClientError;
+import org.apache.sqoop.client.core.Environment;
+import org.apache.sqoop.client.request.ConnectionRequest;
+import org.apache.sqoop.client.request.ConnectorRequest;
+import org.apache.sqoop.client.request.FrameworkRequest;
+import org.apache.sqoop.client.request.JobRequest;
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.json.ConnectionBean;
+import org.apache.sqoop.json.ConnectorBean;
+import org.apache.sqoop.json.FrameworkBean;
+import org.apache.sqoop.model.MConnection;
+import org.apache.sqoop.model.MConnector;
+import org.apache.sqoop.model.MFramework;
+import org.apache.sqoop.model.MJob;
+import org.apache.sqoop.validation.Status;
+import org.codehaus.groovy.tools.shell.IO;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.ResourceBundle;
+
+import static org.apache.sqoop.client.utils.FormFiller.*;
+
+/**
+ * Handles creation of new job objects.
+ */
+public class CreateJobFunction extends SqoopFunction {
+
+ private static final String XID = "xid";
+ private static final String TYPE = "type";
+
+ private FrameworkRequest frameworkRequest;
+ private ConnectionRequest connectionRequest;
+ private ConnectorRequest connectorRequest;
+ private JobRequest jobRequest;
+
+ private IO io;
+
+ @SuppressWarnings("static-access")
+ public CreateJobFunction(IO io) {
+ this.io = io;
+
+ this.addOption(OptionBuilder
+ .withDescription("Connection ID")
+ .withLongOpt(XID)
+ .hasArg()
+ .create(XID.charAt(0))
+ );
+ this.addOption(OptionBuilder
+ .withDescription("Job type")
+ .withLongOpt(TYPE)
+ .hasArg()
+ .create(TYPE.charAt(0))
+ );
+ }
+
+ public Object execute(List<String> args) {
+ CommandLine line = parseOptions(this, 1, args);
+ if (!line.hasOption(XID)) {
+ io.out.println("Required argument --xid is missing.");
+ return null;
+ }
+ if (!line.hasOption(TYPE)) {
+ io.out.println("Required argument --type is missing.");
+ return null;
+ }
+
+ try {
+ createJob(line.getOptionValue(XID), line.getOptionValue(TYPE));
+ } catch (IOException ex) {
+ throw new SqoopException(ClientError.CLIENT_0005, ex);
+ }
+
+ return null;
+ }
+
+ private void createJob(String connectionId, String type) throws IOException {
+ io.out.println("Creating job for connection with id " + connectionId);
+
+ ConsoleReader reader = new ConsoleReader();
+
+ FrameworkBean frameworkBean = getFrameworkBean();
+ ConnectionBean connectionBean = getConnectionBean(connectionId);
+ ConnectorBean connectorBean;
+
+ MFramework framework = frameworkBean.getFramework();
+ ResourceBundle frameworkBundle = frameworkBean.getResourceBundle();
+
+ MConnection connection = connectionBean.getConnections().get(0);
+
+ connectorBean = getConnectorBean(connection.getConnectorId());
+ MConnector connector = connectorBean.getConnectors().get(0);
+ ResourceBundle connectorBundle = connectorBean.getResourceBundles().get(0);
+
+ MJob.Type jobType = MJob.Type.valueOf(type.toUpperCase());
+
+ MJob job = new MJob(
+ connector.getPersistenceId(),
+ connection.getPersistenceId(),
+ jobType,
+ connector.getJobForms(jobType),
+ framework.getJobForms(jobType)
+ );
+
+ Status status = Status.FINE;
+
+ io.out.println("Please fill following values to create new job"
+ + " object");
+
+ do {
+ if( !status.canProceed() ) {
+ io.out.println();
+ io.out.println("@|red There are issues with entered data, please"
+ + " revise your input:|@");
+ }
+
+ // Query connector forms
+ if(!fillForms(io, job.getConnectorPart().getForms(),
+ reader, connectorBundle)) {
+ return;
+ }
+
+ // Query framework forms
+ if(!fillForms(io, job.getFrameworkPart().getForms(),
+ reader, frameworkBundle)) {
+ return;
+ }
+
+ // Try to create
+ status = createJob(job);
+ } while(!status.canProceed());
+
+ io.out.println("New job was successfully created with validation "
+ + "status " + status.name());
+ }
+
+
+ private FrameworkBean getFrameworkBean() {
+ if (frameworkRequest == null) {
+ frameworkRequest = new FrameworkRequest();
+ }
+
+ return frameworkRequest.read(Environment.getServerUrl());
+ }
+
+ private ConnectionBean getConnectionBean(String xid) {
+ if (connectionRequest == null) {
+ connectionRequest = new ConnectionRequest();
+ }
+
+ return connectionRequest.read(Environment.getServerUrl(), xid);
+ }
+
+ private ConnectorBean getConnectorBean(long cid) {
+ if (connectorRequest == null) {
+ connectorRequest = new ConnectorRequest();
+ }
+
+ return connectorRequest.read(Environment.getServerUrl(),
+ Long.toString(cid));
+ }
+
+ private Status createJob(MJob job) {
+ if (jobRequest == null) {
+ jobRequest = new JobRequest();
+ }
+
+ return jobRequest.create(Environment.getServerUrl(), job);
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/866d46df/client/src/main/java/org/apache/sqoop/client/shell/DeleteCommand.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/sqoop/client/shell/DeleteCommand.java b/client/src/main/java/org/apache/sqoop/client/shell/DeleteCommand.java
index 1616b4a..1ea773b 100644
--- a/client/src/main/java/org/apache/sqoop/client/shell/DeleteCommand.java
+++ b/client/src/main/java/org/apache/sqoop/client/shell/DeleteCommand.java
@@ -29,6 +29,7 @@ import java.util.List;
public class DeleteCommand extends SqoopCommand {
private DeleteConnectionFunction connectionFunction;
+ private DeleteJobFunction jobFunction;
public DeleteCommand(Shell shell) {
super(shell, "delete", "\\d",
@@ -50,7 +51,11 @@ public class DeleteCommand extends SqoopCommand {
connectionFunction = new DeleteConnectionFunction(io);
}
return connectionFunction.execute(args);
-
+ } else if (func.equals("job")) {
+ if (jobFunction == null) {
+ jobFunction = new DeleteJobFunction(io);
+ }
+ return jobFunction.execute(args);
} else {
String msg = "Usage: delete " + getUsage();
throw new SqoopException(ClientError.CLIENT_0002, msg);
http://git-wip-us.apache.org/repos/asf/sqoop/blob/866d46df/client/src/main/java/org/apache/sqoop/client/shell/DeleteJobFunction.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/sqoop/client/shell/DeleteJobFunction.java b/client/src/main/java/org/apache/sqoop/client/shell/DeleteJobFunction.java
new file mode 100644
index 0000000..08f37d8
--- /dev/null
+++ b/client/src/main/java/org/apache/sqoop/client/shell/DeleteJobFunction.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.client.shell;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.sqoop.client.core.Environment;
+import org.apache.sqoop.client.request.JobRequest;
+import org.codehaus.groovy.tools.shell.IO;
+
+import java.util.List;
+
+/**
+ * Handles deletion of a job object.
+ */
+public class DeleteJobFunction extends SqoopFunction {
+
+ private IO io;
+
+ private JobRequest jobRequest;
+
+ private static final String JID = "jid";
+
+ @SuppressWarnings("static-access")
+ public DeleteJobFunction(IO io) {
+ this.io = io;
+
+ this.addOption(OptionBuilder
+ .withDescription("Job ID")
+ .withLongOpt(JID)
+ .hasArg()
+ .create('j'));
+ }
+
+ public Object execute(List<String> args) {
+ CommandLine line = parseOptions(this, 1, args);
+ if (!line.hasOption(JID)) {
+ io.out.println("Required argument --jid is missing.");
+ return null;
+ }
+
+ if (jobRequest == null) {
+ jobRequest = new JobRequest();
+ }
+
+ jobRequest.delete(Environment.getServerUrl(),
+ Long.valueOf(line.getOptionValue(JID)));
+
+ return null;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/866d46df/client/src/main/java/org/apache/sqoop/client/shell/ShowCommand.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/sqoop/client/shell/ShowCommand.java b/client/src/main/java/org/apache/sqoop/client/shell/ShowCommand.java
index 22182e9..7c8ab5c 100644
--- a/client/src/main/java/org/apache/sqoop/client/shell/ShowCommand.java
+++ b/client/src/main/java/org/apache/sqoop/client/shell/ShowCommand.java
@@ -28,6 +28,7 @@ public class ShowCommand extends SqoopCommand
private ShowServerFunction serverFunction;
private ShowVersionFunction versionFunction;
private ShowConnectorFunction connectorFunction;
+ private ShowJobFunction jobFunction;
private ShowFrameworkFunction frameworkFunction;
private ShowConnectionFunction connectionFunction;
@@ -78,8 +79,10 @@ public class ShowCommand extends SqoopCommand
return connectionFunction.execute(args);
} else if (func.equals("job")) {
- return null;
-
+ if (jobFunction == null) {
+ jobFunction = new ShowJobFunction(io);
+ }
+ return jobFunction.execute(args);
} else {
String msg = "Usage: show " + getUsage();
throw new SqoopException(ClientError.CLIENT_0002, msg);
http://git-wip-us.apache.org/repos/asf/sqoop/blob/866d46df/client/src/main/java/org/apache/sqoop/client/shell/ShowJobFunction.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/sqoop/client/shell/ShowJobFunction.java b/client/src/main/java/org/apache/sqoop/client/shell/ShowJobFunction.java
new file mode 100644
index 0000000..19b11cc
--- /dev/null
+++ b/client/src/main/java/org/apache/sqoop/client/shell/ShowJobFunction.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.client.shell;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.sqoop.client.core.Environment;
+import org.apache.sqoop.client.request.JobRequest;
+import org.apache.sqoop.json.JobBean;
+import org.apache.sqoop.model.MJob;
+import org.codehaus.groovy.tools.shell.IO;
+
+import java.io.PrintWriter;
+import java.util.List;
+
+import static org.apache.sqoop.client.utils.FormDisplayer.*;
+
+/**
+ *
+ */
+public class ShowJobFunction extends SqoopFunction {
+
+ public static final String ALL = "all";
+ public static final String JID = "jid";
+
+ private IO io;
+ private JobRequest jobRequest;
+
+ @SuppressWarnings("static-access")
+ protected ShowJobFunction(IO io) {
+ this.io = io;
+
+ this.addOption(OptionBuilder
+ .withDescription("Display all jobs")
+ .withLongOpt(ALL)
+ .create(ALL.charAt(0)));
+ this.addOption(OptionBuilder.hasArg().withArgName("jid")
+ .withDescription("Display job with given jid" )
+ .withLongOpt(JID)
+ .create('j'));
+ }
+
+ public void printHelp(PrintWriter out) {
+ out.println("Usage: show job");
+ super.printHelp(out);
+ }
+
+ public Object execute(List<String> args) {
+ if (args.size() == 1) {
+ printHelp(io.out);
+ io.out.println();
+ return null;
+ }
+
+ CommandLine line = parseOptions(this, 1, args);
+ if (line.hasOption(ALL)) {
+ showJob(null);
+
+ } else if (line.hasOption(JID)) {
+ showJob(line.getOptionValue(JID));
+ }
+
+ return null;
+ }
+
+ private void showJob(String jid) {
+ if (jobRequest == null) {
+ jobRequest = new JobRequest();
+ }
+ JobBean jobBean = jobRequest.read(Environment.getServerUrl(), jid);
+
+ List<MJob> jobs = jobBean.getJobs();
+
+ io.out.println("@|bold " + jobs.size()
+ + " job(s) to show: |@");
+
+ for (MJob job : jobs) {
+ io.out.println("Job with id " + job.getPersistenceId()
+ + " and name: " + job.getName());
+
+ long connectorId = job.getConnectorId();
+
+ // Display connector part
+ displayForms(io,
+ job.getConnectorPart().getForms(),
+ jobBean.getConnectorBundle(connectorId));
+ displayForms(io,
+ job.getFrameworkPart().getForms(),
+ jobBean.getFrameworkBundle());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/866d46df/client/src/main/java/org/apache/sqoop/client/shell/UpdateCommand.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/sqoop/client/shell/UpdateCommand.java b/client/src/main/java/org/apache/sqoop/client/shell/UpdateCommand.java
index ad72a4e..5bac209 100644
--- a/client/src/main/java/org/apache/sqoop/client/shell/UpdateCommand.java
+++ b/client/src/main/java/org/apache/sqoop/client/shell/UpdateCommand.java
@@ -26,9 +26,10 @@ import java.util.List;
/**
*
*/
-public class UpdateCommand extends SqoopCommand {
+public class UpdateCommand extends SqoopCommand {
private UpdateConnectionFunction connectionFunction;
+ private UpdateJobFunction jobFunction;
public UpdateCommand(Shell shell) {
super(shell, "update", "\\up",
@@ -49,7 +50,11 @@ public class UpdateCommand extends SqoopCommand {
connectionFunction = new UpdateConnectionFunction(io);
}
return connectionFunction.execute(args);
-
+ } else if (func.equals("job")) {
+ if (jobFunction == null) {
+ jobFunction = new UpdateJobFunction(io);
+ }
+ return jobFunction.execute(args);
} else {
String msg = "Usage: update " + getUsage();
throw new SqoopException(ClientError.CLIENT_0002, msg);
http://git-wip-us.apache.org/repos/asf/sqoop/blob/866d46df/client/src/main/java/org/apache/sqoop/client/shell/UpdateConnectionFunction.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/sqoop/client/shell/UpdateConnectionFunction.java b/client/src/main/java/org/apache/sqoop/client/shell/UpdateConnectionFunction.java
index c9ced51..1d64296 100644
--- a/client/src/main/java/org/apache/sqoop/client/shell/UpdateConnectionFunction.java
+++ b/client/src/main/java/org/apache/sqoop/client/shell/UpdateConnectionFunction.java
@@ -99,13 +99,13 @@ public class UpdateConnectionFunction extends SqoopFunction {
}
// Query connector forms
- if(!fillForms(io, connection.getConnectorPart(),
+ if(!fillForms(io, connection.getConnectorPart().getForms(),
reader, connectorBundle)) {
return;
}
// Query framework forms
- if(!fillForms(io, connection.getFrameworkPart(),
+ if(!fillForms(io, connection.getFrameworkPart().getForms(),
reader, frameworkBundle)) {
return;
}
@@ -114,7 +114,7 @@ public class UpdateConnectionFunction extends SqoopFunction {
status = updateConnection(connection);
} while(!status.canProceed());
- io.out.println("Connection was sucessfully updated with status "
+ io.out.println("Connection was successfully updated with status "
+ status.name());
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/866d46df/client/src/main/java/org/apache/sqoop/client/shell/UpdateJobFunction.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/sqoop/client/shell/UpdateJobFunction.java b/client/src/main/java/org/apache/sqoop/client/shell/UpdateJobFunction.java
new file mode 100644
index 0000000..58749e9
--- /dev/null
+++ b/client/src/main/java/org/apache/sqoop/client/shell/UpdateJobFunction.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.client.shell;
+
+import jline.ConsoleReader;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.sqoop.client.core.ClientError;
+import org.apache.sqoop.client.core.Environment;
+import org.apache.sqoop.client.request.JobRequest;
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.json.JobBean;
+import org.apache.sqoop.model.MJob;
+import org.apache.sqoop.validation.Status;
+import org.codehaus.groovy.tools.shell.IO;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.ResourceBundle;
+
+import static org.apache.sqoop.client.utils.FormFiller.fillForms;
+
+/**
+ *
+ */
+public class UpdateJobFunction extends SqoopFunction {
+
+ private static final String JID = "jid";
+
+ private JobRequest jobRequest;
+
+ private IO io;
+
+ @SuppressWarnings("static-access")
+ public UpdateJobFunction(IO io) {
+ this.io = io;
+
+ this.addOption(OptionBuilder
+ .withDescription("Job ID")
+ .withLongOpt(JID)
+ .hasArg()
+ .create(JID.charAt(0)));
+ }
+
+ public Object execute(List<String> args) {
+ CommandLine line = parseOptions(this, 1, args);
+ if (!line.hasOption(JID)) {
+ io.out.println("Required argument --jid is missing.");
+ return null;
+ }
+
+ try {
+ updateJob(line.getOptionValue(JID));
+ } catch (IOException ex) {
+ throw new SqoopException(ClientError.CLIENT_0005, ex);
+ }
+
+ return null;
+ }
+
+ private void updateJob(String jobId) throws IOException {
+ io.out.println("Updating job with id " + jobId);
+
+ ConsoleReader reader = new ConsoleReader();
+
+ JobBean jobBean = readJob(jobId);
+
+ // TODO(jarcec): Check that we have expected data
+ MJob job = jobBean.getJobs().get(0);
+ ResourceBundle frameworkBundle
+ = jobBean.getFrameworkBundle();
+ ResourceBundle connectorBundle
+ = jobBean.getConnectorBundle(job.getConnectorId());
+
+ Status status = Status.FINE;
+
+ io.out.println("Please update job metadata:");
+
+ do {
+ if( !status.canProceed() ) {
+ io.out.println();
+ io.out.println("@|red There are issues with entered data, please"
+ + " revise your input:|@");
+ }
+
+ // Query connector forms
+ if(!fillForms(io, job.getConnectorPart().getForms(),
+ reader, connectorBundle)) {
+ return;
+ }
+
+ // Query framework forms
+ if(!fillForms(io, job.getFrameworkPart().getForms(),
+ reader, frameworkBundle)) {
+ return;
+ }
+
+ // Try to create
+ status = updateJob(job);
+ } while(!status.canProceed());
+
+ io.out.println("Job was successfully updated with status "
+ + status.name());
+ }
+
+ private Status updateJob(MJob job) {
+ if (jobRequest == null) {
+ jobRequest = new JobRequest();
+ }
+
+ return jobRequest.update(Environment.getServerUrl(), job);
+ }
+
+ private JobBean readJob(String jobId) {
+ if (jobRequest == null) {
+ jobRequest = new JobRequest();
+ }
+
+ return jobRequest.read(Environment.getServerUrl(), jobId);
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/866d46df/client/src/main/java/org/apache/sqoop/client/utils/FormFiller.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/sqoop/client/utils/FormFiller.java b/client/src/main/java/org/apache/sqoop/client/utils/FormFiller.java
index 5b6ebb9..af3ff8a 100644
--- a/client/src/main/java/org/apache/sqoop/client/utils/FormFiller.java
+++ b/client/src/main/java/org/apache/sqoop/client/utils/FormFiller.java
@@ -18,13 +18,13 @@
package org.apache.sqoop.client.utils;
import jline.ConsoleReader;
-import org.apache.sqoop.model.MConnectionForms;
import org.apache.sqoop.model.MForm;
import org.apache.sqoop.model.MInput;
import org.apache.sqoop.model.MStringInput;
import org.codehaus.groovy.tools.shell.IO;
import java.io.IOException;
+import java.util.List;
import java.util.ResourceBundle;
/**
@@ -34,11 +34,11 @@ public class FormFiller {
public static boolean fillForms(IO io,
- MConnectionForms formsMetadata,
+ List<MForm> forms,
ConsoleReader reader,
ResourceBundle bundle)
throws IOException {
- for (MForm form : formsMetadata.getForms()) {
+ for (MForm form : forms) {
if(!fillForm(io, form, reader, bundle)) {
return false;
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/866d46df/common/src/main/java/org/apache/sqoop/json/JobBean.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/json/JobBean.java b/common/src/main/java/org/apache/sqoop/json/JobBean.java
new file mode 100644
index 0000000..7fd2ce5
--- /dev/null
+++ b/common/src/main/java/org/apache/sqoop/json/JobBean.java
@@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.json;
+
+import org.apache.sqoop.model.MForm;
+import org.apache.sqoop.model.MJob;
+import org.apache.sqoop.model.MJobForms;
+import org.json.simple.JSONArray;
+import org.json.simple.JSONObject;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.ResourceBundle;
+import java.util.Set;
+
+import static org.apache.sqoop.json.util.FormSerialization.*;
+import static org.apache.sqoop.json.util.ResourceBundleSerialization.*;
+
+/**
+ *
+ */
+public class JobBean implements JsonBean {
+
+ private static final String ALL = "all";
+ private static final String ID = "id";
+ private static final String NAME = "name";
+ private static final String TYPE = "type";
+ private static final String CONNECTION_ID = "connection-id";
+ private static final String CONNECTOR_ID = "connector-id";
+ private static final String CONNECTOR_PART = "connector";
+ private static final String FRAMEWORK_PART = "framework";
+
+ // Compulsory
+ private List<MJob> jobs;
+
+ // Optional
+ private Map<Long, ResourceBundle> connectorBundles;
+ private ResourceBundle frameworkBundle;
+
+ // For "extract"
+ public JobBean(MJob job) {
+ this();
+ this.jobs = new ArrayList<MJob>();
+ this.jobs.add(job);
+ }
+
+ public JobBean(List<MJob> connections) {
+ this();
+ this.jobs = connections;
+ }
+
+ // For "restore"
+ public JobBean() {
+ connectorBundles = new HashMap<Long, ResourceBundle>();
+ }
+
+ public void setFrameworkBundle(ResourceBundle frameworkBundle) {
+ this.frameworkBundle = frameworkBundle;
+ }
+
+ public void addConnectorBundle(Long id, ResourceBundle connectorBundle) {
+ connectorBundles.put(id, connectorBundle);
+ }
+
+ public boolean hasConnectorBundle(Long id) {
+ return connectorBundles.containsKey(id);
+ }
+
+ public List<MJob> getJobs() {
+ return jobs;
+ }
+
+ public ResourceBundle getConnectorBundle(Long id) {
+ return connectorBundles.get(id);
+ }
+
+ public ResourceBundle getFrameworkBundle() {
+ return frameworkBundle;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public JSONObject extract() {
+ JSONArray array = new JSONArray();
+
+ for(MJob job : jobs) {
+ JSONObject object = new JSONObject();
+
+ object.put(ID, job.getPersistenceId());
+ object.put(NAME, job.getName());
+ object.put(TYPE, job.getType().name());
+ object.put(CONNECTION_ID, job.getConnectionId());
+ object.put(CONNECTOR_ID, job.getConnectorId());
+ object.put(CONNECTOR_PART,
+ extractForms(job.getConnectorPart().getForms()));
+ object.put(FRAMEWORK_PART,
+ extractForms(job.getFrameworkPart().getForms()));
+
+ array.add(object);
+ }
+
+ JSONObject all = new JSONObject();
+ all.put(ALL, array);
+
+ if(!connectorBundles.isEmpty()) {
+ JSONObject bundles = new JSONObject();
+
+ for(Map.Entry<Long, ResourceBundle> entry : connectorBundles.entrySet()) {
+ bundles.put(entry.getKey().toString(),
+ extractResourceBundle(entry.getValue()));
+ }
+
+ all.put(CONNECTOR_RESOURCES, bundles);
+ }
+ if(frameworkBundle != null) {
+ all.put(FRAMEWORK_RESOURCES,extractResourceBundle(frameworkBundle));
+ }
+ return all;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public void restore(JSONObject jsonObject) {
+ jobs = new ArrayList<MJob>();
+
+ JSONArray array = (JSONArray) jsonObject.get(ALL);
+
+ for (Object obj : array) {
+ JSONObject object = (JSONObject) obj;
+
+ long connectorId = (Long) object.get(CONNECTOR_ID);
+ long connectionId = (Long) object.get(CONNECTION_ID);
+ JSONArray connectorPart = (JSONArray) object.get(CONNECTOR_PART);
+ JSONArray frameworkPart = (JSONArray) object.get(FRAMEWORK_PART);
+
+ String stringType = (String) object.get(TYPE);
+ MJob.Type type = MJob.Type.valueOf(stringType);
+
+ List<MForm> connectorForms = restoreForms(connectorPart);
+ List<MForm> frameworkForms = restoreForms(frameworkPart);
+
+ MJob job = new MJob(
+ connectorId,
+ connectionId,
+ type,
+ new MJobForms(type, connectorForms),
+ new MJobForms(type, frameworkForms)
+ );
+
+ job.setPersistenceId((Long) object.get(ID));
+ job.setName((String) object.get(NAME));
+
+ jobs.add(job);
+ }
+
+ if(jsonObject.containsKey(CONNECTOR_RESOURCES)) {
+ JSONObject bundles = (JSONObject) jsonObject.get(CONNECTOR_RESOURCES);
+ Set<Map.Entry<String, JSONObject>> entrySet = bundles.entrySet();
+ for (Map.Entry<String, JSONObject> entry : entrySet) {
+ connectorBundles.put(Long.parseLong(entry.getKey()),
+ restoreResourceBundle(entry.getValue()));
+ }
+ }
+ if(jsonObject.containsKey(FRAMEWORK_RESOURCES)) {
+ frameworkBundle = restoreResourceBundle(
+ (JSONObject) jsonObject.get(FRAMEWORK_RESOURCES));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/866d46df/common/src/main/java/org/apache/sqoop/json/ValidationBean.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/json/ValidationBean.java b/common/src/main/java/org/apache/sqoop/json/ValidationBean.java
index 73b73fa..b56f6ed 100644
--- a/common/src/main/java/org/apache/sqoop/json/ValidationBean.java
+++ b/common/src/main/java/org/apache/sqoop/json/ValidationBean.java
@@ -20,6 +20,7 @@ package org.apache.sqoop.json;
import org.apache.sqoop.model.MConnection;
import org.apache.sqoop.model.MForm;
import org.apache.sqoop.model.MInput;
+import org.apache.sqoop.model.MJob;
import org.apache.sqoop.model.MValidatedElement;
import org.apache.sqoop.validation.Status;
import org.json.simple.JSONObject;
@@ -34,6 +35,7 @@ import java.util.List;
public class ValidationBean implements JsonBean {
private static final String STATUS = "status";
+ private static final String TYPE = "type";
private static final String CONNECTOR_PART = "connector";
private static final String FRAMEWORK_PART = "framework";
@@ -41,6 +43,7 @@ public class ValidationBean implements JsonBean {
private static final String MESSAGE = "message";
private MConnection connection;
+ private MJob job;
private Status status;
// For "extract"
@@ -48,15 +51,25 @@ public class ValidationBean implements JsonBean {
this.connection = connection;
this.status = status;
}
+ public ValidationBean(MJob job, Status status) {
+ this.job = job;
+ this.status = status;
+ }
// For "restore"
public ValidationBean(MConnection connection) {
this.connection = connection;
}
+ public ValidationBean(MJob job) {
+ this.job = job;
+ }
public MConnection getConnection() {
return connection;
}
+ public MJob getJob() {
+ return job;
+ }
public Status getStatus() {
return status;
@@ -67,11 +80,22 @@ public class ValidationBean implements JsonBean {
public JSONObject extract() {
JSONObject object = new JSONObject();
+ List<MForm> connectorPart = null;
+ List<MForm> frameworkPart = null;
+
+ if(connection != null) {
+ connectorPart = connection.getConnectorPart().getForms();
+ frameworkPart = connection.getFrameworkPart().getForms();
+ object.put(TYPE, "CONNECTION");
+ } else if (job != null) {
+ connectorPart = job.getConnectorPart().getForms();
+ frameworkPart = job.getFrameworkPart().getForms();
+ object.put(TYPE, "FRAMEWORK");
+ }
+
object.put(STATUS, status.name());
- object.put(CONNECTOR_PART,
- extractForms(connection.getConnectorPart().getForms()));
- object.put(FRAMEWORK_PART,
- extractForms(connection.getFrameworkPart().getForms()));
+ object.put(CONNECTOR_PART, extractForms(connectorPart));
+ object.put(FRAMEWORK_PART, extractForms(frameworkPart));
return object;
}
@@ -110,8 +134,13 @@ public class ValidationBean implements JsonBean {
JSONObject connectorPart = (JSONObject) jsonObject.get(CONNECTOR_PART);
JSONObject frameworkPart = (JSONObject) jsonObject.get(FRAMEWORK_PART);
- restoreForms(connectorPart, connection.getConnectorPart().getForms());
- restoreForms(frameworkPart, connection.getFrameworkPart().getForms());
+ if(connection != null) {
+ restoreForms(connectorPart, connection.getConnectorPart().getForms());
+ restoreForms(frameworkPart, connection.getFrameworkPart().getForms());
+ } else if (job != null) {
+ restoreForms(connectorPart, job.getConnectorPart().getForms());
+ restoreForms(frameworkPart, job.getFrameworkPart().getForms());
+ }
}
private void restoreForms(JSONObject json, List<MForm> forms) {
http://git-wip-us.apache.org/repos/asf/sqoop/blob/866d46df/common/src/main/java/org/apache/sqoop/model/MJob.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/model/MJob.java b/common/src/main/java/org/apache/sqoop/model/MJob.java
index d45ef5e..186716a 100644
--- a/common/src/main/java/org/apache/sqoop/model/MJob.java
+++ b/common/src/main/java/org/apache/sqoop/model/MJob.java
@@ -17,23 +17,65 @@
*/
package org.apache.sqoop.model;
+import org.apache.sqoop.common.SqoopException;
+
/**
* Model describing entire job object including both connector and
* framework part.
*/
-public class MJob extends MNamedElement {
+public class MJob extends MPersistableEntity {
static public enum Type {
IMPORT,
EXPORT,
}
- // TODO(jarcec): We probably need reference to connection object here
+ /**
+ * Connector reference.
+ *
+ * Job object do not immediately depend on connector as there is indirect
+ * dependency through connection object, but having this dependency explicitly
+ * carried along helps a lot.
+ */
+ long connectorId;
+
+ /**
+ * Corresponding connection object.
+ */
+ long connectionId;
+
+ /**
+ * User name for this object
+ */
+ String name;
+
+ /**
+ * Job type
+ */
+ Type type;
+
MJobForms connectorPart;
MJobForms frameworkPart;
- public MJob(String name) {
- super(name);
+ public MJob(long connectorId,
+ long connectionId,
+ Type type,
+ MJobForms connectorPart,
+ MJobForms frameworkPart) {
+ this.connectorId = connectorId;
+ this.connectionId = connectionId;
+ this.type = type;
+ this.connectorPart = connectorPart;
+ this.frameworkPart = frameworkPart;
+
+ // Check that we're operating on forms with same type
+ if (type != connectorPart.getType() || type != frameworkPart.getType()) {
+ throw new SqoopException(ModelError.MODEL_002,
+ "Incompatible types, job: " + type.name()
+ + ", connector part: " + connectorPart.getType().name()
+ + ", framework part: " + frameworkPart.getType().name()
+ );
+ }
}
@Override
@@ -43,4 +85,32 @@ public class MJob extends MNamedElement {
return sb.toString();
}
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public long getConnectionId() {
+ return connectionId;
+ }
+
+ public long getConnectorId() {
+ return connectorId;
+ }
+
+ public MJobForms getConnectorPart() {
+ return connectorPart;
+ }
+
+ public MJobForms getFrameworkPart() {
+ return frameworkPart;
+ }
+
+ public Type getType() {
+ return type;
+ }
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/866d46df/common/src/main/java/org/apache/sqoop/model/ModelError.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/model/ModelError.java b/common/src/main/java/org/apache/sqoop/model/ModelError.java
index 2864bef..7a9ebd8 100644
--- a/common/src/main/java/org/apache/sqoop/model/ModelError.java
+++ b/common/src/main/java/org/apache/sqoop/model/ModelError.java
@@ -24,7 +24,11 @@ import org.apache.sqoop.common.ErrorCode;
*/
public enum ModelError implements ErrorCode {
- MODEL_001("Attempt to pass two different set of MForms for single job type.");
+ MODEL_001("Attempt to pass two different set of MForms for single job type."),
+
+ MODEL_002("Creating MJob of different job types"),
+
+ ;
private final String message;
http://git-wip-us.apache.org/repos/asf/sqoop/blob/866d46df/common/src/test/java/org/apache/sqoop/json/TestJobBean.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/sqoop/json/TestJobBean.java b/common/src/test/java/org/apache/sqoop/json/TestJobBean.java
new file mode 100644
index 0000000..521b2b9
--- /dev/null
+++ b/common/src/test/java/org/apache/sqoop/json/TestJobBean.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.json;
+
+import org.apache.sqoop.model.MJob;
+import org.apache.sqoop.model.MStringInput;
+import org.json.simple.JSONObject;
+import org.json.simple.JSONValue;
+import org.json.simple.parser.ParseException;
+import org.junit.Test;
+
+import static junit.framework.Assert.assertEquals;
+import static org.apache.sqoop.json.TestUtil.getJob;
+
+/**
+ *
+ */
+public class TestJobBean {
+ @Test
+ public void testSerialization() throws ParseException {
+ MJob job = getJob("ahoj", MJob.Type.IMPORT);
+ job.setName("The big job");
+ job.setPersistenceId(666);
+
+ // Fill some data at the beginning
+ MStringInput input = (MStringInput) job.getConnectorPart().getForms()
+ .get(0).getInputs().get(0);
+ input.setValue("Hi there!");
+
+ // Serialize it to JSON object
+ JobBean bean = new JobBean(job);
+ JSONObject json = bean.extract();
+
+ // "Move" it across network in text form
+ String string = json.toJSONString();
+
+ // Retrieved transferred object
+ JSONObject retrievedJson = (JSONObject)JSONValue.parseWithException(string);
+ JobBean retrievedBean = new JobBean();
+ retrievedBean.restore(retrievedJson);
+ MJob target = retrievedBean.getJobs().get(0);
+
+ // Check id and name
+ assertEquals(666, target.getPersistenceId());
+ assertEquals(MJob.Type.IMPORT, target.getType());
+ assertEquals("The big job", target.getName());
+
+ // Test that value was correctly moved
+ MStringInput targetInput = (MStringInput) target.getConnectorPart()
+ .getForms().get(0).getInputs().get(0);
+ assertEquals("Hi there!", targetInput.getValue());
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/866d46df/common/src/test/java/org/apache/sqoop/json/TestUtil.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/sqoop/json/TestUtil.java b/common/src/test/java/org/apache/sqoop/json/TestUtil.java
index 57bfa91..791e999 100644
--- a/common/src/test/java/org/apache/sqoop/json/TestUtil.java
+++ b/common/src/test/java/org/apache/sqoop/json/TestUtil.java
@@ -50,7 +50,16 @@ public class TestUtil {
public static MConnection getConnection(String name) {
return new MConnection(1,
getConnector(name).getConnectionForms(),
- getFramework().getConnectionForms());
+ getFramework().getConnectionForms()
+ );
+ }
+
+ public static MJob getJob(String name, MJob.Type type) {
+ return new MJob(1, 1,
+ type,
+ getConnector(name).getJobForms(type),
+ getFramework().getJobForms(type)
+ );
}
public static MConnectionForms getConnectionForms() {
@@ -79,7 +88,7 @@ public class TestUtil {
return new MConnectionForms(connectionForms);
}
- public static List<MJobForms> getAllJobForms() {
+ public static MJobForms getJobForms(MJob.Type type) {
List<MInput<?>> inputs;
MStringInput input;
MForm form;
@@ -121,8 +130,12 @@ public class TestUtil {
form.setPersistenceId(12);
jobForms.add(form);
+ return new MJobForms(type, jobForms);
+ }
+
+ public static List<MJobForms> getAllJobForms() {
List<MJobForms> jobs = new ArrayList<MJobForms>();
- jobs.add(new MJobForms(MJob.Type.IMPORT, jobForms));
+ jobs.add(getJobForms(MJob.Type.IMPORT));
return jobs;
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/866d46df/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcValidator.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcValidator.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcValidator.java
index d307aff..f73f76e 100644
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcValidator.java
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcValidator.java
@@ -37,6 +37,10 @@ public class GenericJdbcValidator extends Validator {
return validateConnectionForm(form);
}
+ if(form.getName().equals(FORM_TABLE)) {
+ return Status.ACCEPTABLE;
+ }
+
// This do not seem as our form
return Status.UNACCEPTABLE;
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/866d46df/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java b/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java
index 7a38e4b..be23ebf 100644
--- a/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java
+++ b/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java
@@ -25,6 +25,7 @@ import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.model.MConnection;
import org.apache.sqoop.model.MConnector;
import org.apache.sqoop.model.MFramework;
+import org.apache.sqoop.model.MJob;
public class JdbcRepository implements Repository {
@@ -239,4 +240,90 @@ public class JdbcRepository implements Repository {
}
});
}
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void createJob(final MJob job) {
+ doWithConnection(new DoWithConnection() {
+ @Override
+ public Object doIt(Connection conn) {
+ if(job.hasPersistenceId()) {
+ throw new SqoopException(RepositoryError.JDBCREPO_0018);
+ }
+
+ handler.createJob(job, conn);
+ return null;
+ }
+ });
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void updateJob(final MJob job) {
+ doWithConnection(new DoWithConnection() {
+ @Override
+ public Object doIt(Connection conn) {
+ if(!job.hasPersistenceId()) {
+ throw new SqoopException(RepositoryError.JDBCREPO_0019);
+ }
+ if(!handler.existsConnection(job.getPersistenceId(), conn)) {
+ throw new SqoopException(RepositoryError.JDBCREPO_0020,
+ "Invalid id: " + job.getPersistenceId());
+ }
+
+ handler.updateJob(job, conn);
+ return null;
+ }
+ });
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void deleteJob(final long id) {
+ doWithConnection(new DoWithConnection() {
+ @Override
+ public Object doIt(Connection conn) {
+ if(!handler.existsJob(id, conn)) {
+ throw new SqoopException(RepositoryError.JDBCREPO_0020,
+ "Invalid id: " + id);
+ }
+
+ handler.deleteJob(id, conn);
+ return null;
+ }
+ });
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public MJob findJob(final long id) {
+ return (MJob) doWithConnection(new DoWithConnection() {
+ @Override
+ public Object doIt(Connection conn) {
+ return handler.findJob(id, conn);
+ }
+ });
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @SuppressWarnings("unchecked")
+ @Override
+ public List<MJob> findJobs() {
+ return (List<MJob>) doWithConnection(new DoWithConnection() {
+ @Override
+ public Object doIt(Connection conn) {
+ return handler.findJobs(conn);
+ }
+ });
+ }
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/866d46df/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java b/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java
index 95a0737..aaca7f7 100644
--- a/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java
+++ b/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java
@@ -23,6 +23,7 @@ import java.util.List;
import org.apache.sqoop.model.MConnection;
import org.apache.sqoop.model.MConnector;
import org.apache.sqoop.model.MFramework;
+import org.apache.sqoop.model.MJob;
/**
* Set of methods required from each JDBC based repository.
@@ -159,4 +160,58 @@ public interface JdbcRepositoryHandler {
* @return List will all saved connection objects
*/
public List<MConnection> findConnections(Connection conn);
+
+
+ /**
+ * Save given job to repository. This job object must not be already
+ * present in the repository otherwise exception will be thrown.
+ *
+ * @param job Job object to serialize into repository.
+ * @param conn Connection to metadata repository
+ */
+ public void createJob(MJob job, Connection conn);
+
+ /**
+ * Update given job representation in repository. This job object must
+ * already exists in the repository otherwise exception will be
+ * thrown.
+ *
+ * @param job Job object that should be updated in repository.
+ * @param conn Connection to metadata repository
+ */
+ public void updateJob(MJob job, Connection conn);
+
+ /**
+ * Check if given job exists in metastore.
+ *
+ * @param id Job id
+ * @param conn Connection to metadata repository
+ * @return True if the job exists
+ */
+ public boolean existsJob(long id, Connection conn);
+
+ /**
+ * Delete job with given id from metadata repository.
+ *
+ * @param id Job object that should be removed from repository
+ * @param conn Connection to metadata repository
+ */
+ public void deleteJob(long id, Connection conn);
+
+ /**
+ * Find job with given id in repository.
+ *
+ * @param id Job id
+ * @param conn Connection to metadata repository
+ * @return Deserialized form of the job that is present in the repository
+ */
+ public MJob findJob(long id, Connection conn);
+
+ /**
+ * Get all job objects.
+ *
+ * @param conn Connection to metadata repository
+ * @return List will all saved job objects
+ */
+ public List<MJob> findJobs(Connection conn);
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/866d46df/core/src/main/java/org/apache/sqoop/repository/Repository.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/repository/Repository.java b/core/src/main/java/org/apache/sqoop/repository/Repository.java
index ef822c4..5580b4e 100644
--- a/core/src/main/java/org/apache/sqoop/repository/Repository.java
+++ b/core/src/main/java/org/apache/sqoop/repository/Repository.java
@@ -20,6 +20,7 @@ package org.apache.sqoop.repository;
import org.apache.sqoop.model.MConnection;
import org.apache.sqoop.model.MConnector;
import org.apache.sqoop.model.MFramework;
+import org.apache.sqoop.model.MJob;
import java.util.List;
@@ -94,7 +95,45 @@ public interface Repository {
/**
* Get all connection objects.
*
- * @return Array will all saved connection objects
+ * @return List will all saved connection objects
*/
public List<MConnection> findConnections();
+
+ /**
+ * Save given job to repository. This job object must not be already present
+ * in repository otherwise exception will be thrown.
+ *
+ * @param job Job object that should be saved to repository
+ */
+ public void createJob(MJob job);
+
+ /**
+ * Update given job metadata in repository. This object must already be saved
+ * in repository otherwise exception will be thrown.
+ *
+ * @param job Job object that should be updated in the repository
+ */
+ public void updateJob(MJob job);
+
+ /**
+ * Delete job with given id from metadata repository.
+ *
+ * @param id Job id that should be removed
+ */
+ public void deleteJob(long id);
+
+ /**
+ * Find job object with given id.
+ *
+ * @param id Job id
+ * @return Deserialized form of job loaded from repository
+ */
+ public MJob findJob(long id);
+
+ /**
+ * Get all job objects.
+ *
+ * @return List of all jobs in the repository
+ */
+ public List<MJob> findJobs();
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/866d46df/core/src/main/java/org/apache/sqoop/repository/RepositoryError.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/repository/RepositoryError.java b/core/src/main/java/org/apache/sqoop/repository/RepositoryError.java
index 6d6f729..ca0c5a1 100644
--- a/core/src/main/java/org/apache/sqoop/repository/RepositoryError.java
+++ b/core/src/main/java/org/apache/sqoop/repository/RepositoryError.java
@@ -90,6 +90,16 @@ public enum RepositoryError implements ErrorCode {
/** Invalid connection id **/
JDBCREPO_0017("Given connection id is invalid"),
+
+ /** Job that we're trying to create is already saved in repository **/
+ JDBCREPO_0018("Cannot create job that was already created"),
+
+ /** Job that we're trying to update is not yet saved **/
+ JDBCREPO_0019("Cannot update job that was not yet created"),
+
+ /** Invalid job id **/
+ JDBCREPO_0020("Given job id is invalid"),
+
;
private final String message;
http://git-wip-us.apache.org/repos/asf/sqoop/blob/866d46df/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepoError.java
----------------------------------------------------------------------
diff --git a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepoError.java b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepoError.java
index 620561d..00bc926 100644
--- a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepoError.java
+++ b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepoError.java
@@ -64,10 +64,9 @@ public enum DerbyRepoError implements ErrorCode {
DERBYREPO_0011("Metadata cannot have preassigned persistence id"),
/**
- * The system was unable to register connector metadata due to an unexpected
- * update count.
+ * The system was unable to register various entities.
*/
- DERBYREPO_0012("Unexpected update count on connector registration"),
+ DERBYREPO_0012("Unexpected update count when registering entity"),
/**
* The system was unable to register metadata due to a failure to retrieve
@@ -126,6 +125,24 @@ public enum DerbyRepoError implements ErrorCode {
/** We're unable to check if given connection already exists */
DERBYREPO_0025("Unable to check if given connection exists"),
+ /** We cant create new job in metastore **/
+ DERBYREPO_0026("Unable to create new job data"),
+
+ /** We can't update job in metastore **/
+ DERBYREPO_0027("Unable to update job metadata in repository"),
+
+ /** We can't delete job in metastore **/
+ DERBYREPO_0028("Unable to delete job metadata in repository"),
+
+ /** We're unable to check if given job already exists */
+ DERBYREPO_0029("Unable to check if given job exists"),
+
+ /** We can't restore specific job metadata from metastore **/
+ DERBYREPO_0030("Unable to load specific job metadata from repository"),
+
+ /** We can't restore job metadata from metastore **/
+ DERBYREPO_0031("Unable to load job metadata from repository"),
+
;
private final String message;
http://git-wip-us.apache.org/repos/asf/sqoop/blob/866d46df/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java
----------------------------------------------------------------------
diff --git a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java
index 75edb2a..c44270e 100644
--- a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java
+++ b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java
@@ -401,6 +401,9 @@ public class DerbyRepositoryHandler implements JdbcRepositoryHandler {
return "values(1)"; // Yes, this is valid derby SQL
}
+ /**
+ * {@inheritDoc}
+ */
@Override
public void createConnection(MConnection connection, Connection conn) {
PreparedStatement stmt = null;
@@ -425,12 +428,14 @@ public class DerbyRepositoryHandler implements JdbcRepositoryHandler {
long connectionId = rsetConnectionId.getLong(1);
- createConnectionInputs(connectionId,
- connection.getConnectorPart().getForms(),
- conn);
- createConnectionInputs(connectionId,
- connection.getFrameworkPart().getForms(),
- conn);
+ createInputValues(STMT_INSERT_CONNECTION_INPUT,
+ connectionId,
+ connection.getConnectorPart().getForms(),
+ conn);
+ createInputValues(STMT_INSERT_CONNECTION_INPUT,
+ connectionId,
+ connection.getFrameworkPart().getForms(),
+ conn);
} catch (SQLException ex) {
throw new SqoopException(DerbyRepoError.DERBYREPO_0019, ex);
@@ -439,6 +444,9 @@ public class DerbyRepositoryHandler implements JdbcRepositoryHandler {
}
}
+ /**
+ * {@inheritDoc}
+ */
@Override
public void updateConnection(MConnection connection, Connection conn) {
// We're not allowing updating values in SQ_CONNECTION (name, connector id)
@@ -453,12 +461,14 @@ public class DerbyRepositoryHandler implements JdbcRepositoryHandler {
stmt.executeUpdate();
// And reinsert new ones
- createConnectionInputs(connection.getPersistenceId(),
- connection.getConnectorPart().getForms(),
- conn);
- createConnectionInputs(connection.getPersistenceId(),
- connection.getFrameworkPart().getForms(),
- conn);
+ createInputValues(STMT_INSERT_CONNECTION_INPUT,
+ connection.getPersistenceId(),
+ connection.getConnectorPart().getForms(),
+ conn);
+ createInputValues(STMT_INSERT_CONNECTION_INPUT,
+ connection.getPersistenceId(),
+ connection.getFrameworkPart().getForms(),
+ conn);
} catch (SQLException ex) {
throw new SqoopException(DerbyRepoError.DERBYREPO_0021, ex);
@@ -467,6 +477,9 @@ public class DerbyRepositoryHandler implements JdbcRepositoryHandler {
}
}
+ /**
+ * {@inheritDoc}
+ */
@Override
public boolean existsConnection(long id, Connection conn) {
PreparedStatement stmt = null;
@@ -488,6 +501,9 @@ public class DerbyRepositoryHandler implements JdbcRepositoryHandler {
}
}
+ /**
+ * {@inheritDoc}
+ */
@Override
public void deleteConnection(long id, Connection conn) {
PreparedStatement dltConn = null;
@@ -509,6 +525,9 @@ public class DerbyRepositoryHandler implements JdbcRepositoryHandler {
}
}
+ /**
+ * {@inheritDoc}
+ */
@Override
public MConnection findConnection(long id, Connection conn) {
PreparedStatement stmt = null;
@@ -533,6 +552,9 @@ public class DerbyRepositoryHandler implements JdbcRepositoryHandler {
}
}
+ /**
+ * {@inheritDoc}
+ */
@Override
public List<MConnection> findConnections(Connection conn) {
PreparedStatement stmt = null;
@@ -548,6 +570,175 @@ public class DerbyRepositoryHandler implements JdbcRepositoryHandler {
}
}
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void createJob(MJob job, Connection conn) {
+ PreparedStatement stmt = null;
+ int result;
+ try {
+ stmt = conn.prepareStatement(STMT_INSERT_JOB,
+ Statement.RETURN_GENERATED_KEYS);
+ stmt.setString(1, job.getName());
+ stmt.setLong(2, job.getConnectionId());
+ stmt.setString(3, job.getType().name());
+
+ result = stmt.executeUpdate();
+ if (result != 1) {
+ throw new SqoopException(DerbyRepoError.DERBYREPO_0012,
+ Integer.toString(result));
+ }
+
+ ResultSet rsetJobId = stmt.getGeneratedKeys();
+
+ if (!rsetJobId.next()) {
+ throw new SqoopException(DerbyRepoError.DERBYREPO_0013);
+ }
+
+ long jobId = rsetJobId.getLong(1);
+
+ createInputValues(STMT_INSERT_JOB_INPUT,
+ jobId,
+ job.getConnectorPart().getForms(),
+ conn);
+ createInputValues(STMT_INSERT_JOB_INPUT,
+ jobId,
+ job.getFrameworkPart().getForms(),
+ conn);
+
+ } catch (SQLException ex) {
+ throw new SqoopException(DerbyRepoError.DERBYREPO_0026, ex);
+ } finally {
+ closeStatements(stmt);
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void updateJob(MJob job, Connection conn) {
+ // We're not allowing updating values in SQ_JOB (name, type, connection)
+ // TODO(jarcec): Remove this limitation
+
+ PreparedStatement stmt = null;
+ try {
+ // Firstly remove old values
+ stmt = conn.prepareStatement(STMT_DELETE_JOB_INPUT);
+ stmt.setLong(1, job.getPersistenceId());
+ stmt.executeUpdate();
+
+ // And reinsert new ones
+ createInputValues(STMT_INSERT_JOB_INPUT,
+ job.getPersistenceId(),
+ job.getConnectorPart().getForms(),
+ conn);
+ createInputValues(STMT_INSERT_JOB_INPUT,
+ job.getPersistenceId(),
+ job.getFrameworkPart().getForms(),
+ conn);
+
+ } catch (SQLException ex) {
+ throw new SqoopException(DerbyRepoError.DERBYREPO_0027, ex);
+ } finally {
+ closeStatements(stmt);
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public boolean existsJob(long id, Connection conn) {
+ PreparedStatement stmt = null;
+ ResultSet rs = null;
+ try {
+ stmt = conn.prepareStatement(STMT_SELECT_JOB_CHECK);
+ stmt.setLong(1, id);
+ rs = stmt.executeQuery();
+
+ // Should be always valid in query with count
+ rs.next();
+
+ return rs.getLong(1) == 1;
+ } catch (SQLException ex) {
+ throw new SqoopException(DerbyRepoError.DERBYREPO_0029, ex);
+ } finally {
+ closeResultSets(rs);
+ closeStatements(stmt);
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void deleteJob(long id, Connection conn) {
+ PreparedStatement dlt = null;
+ PreparedStatement dltInput = null;
+ try {
+ dltInput = conn.prepareStatement(STMT_DELETE_JOB_INPUT);
+ dlt = conn.prepareStatement(STMT_DELETE_JOB);
+
+ dltInput.setLong(1, id);
+ dlt.setLong(1, id);
+
+ dltInput.executeUpdate();
+ dlt.executeUpdate();
+
+ } catch (SQLException ex) {
+ throw new SqoopException(DerbyRepoError.DERBYREPO_0028, ex);
+ } finally {
+ closeStatements(dlt, dltInput);
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public MJob findJob(long id, Connection conn) {
+ PreparedStatement stmt = null;
+ try {
+ stmt = conn.prepareStatement(STMT_SELECT_JOB_SINGLE);
+ stmt.setLong(1, id);
+
+ List<MJob> jobs = loadJobs(stmt, conn);
+
+ if(jobs.size() != 1) {
+ throw new SqoopException(DerbyRepoError.DERBYREPO_0030, "Couldn't find"
+ + " job with id " + id);
+ }
+
+ // Return the first and only one connection object
+ return jobs.get(0);
+
+ } catch (SQLException ex) {
+ throw new SqoopException(DerbyRepoError.DERBYREPO_0031, ex);
+ } finally {
+ closeStatements(stmt);
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public List<MJob> findJobs(Connection conn) {
+ PreparedStatement stmt = null;
+ try {
+ stmt = conn.prepareStatement(STMT_SELECT_JOB_ALL);
+
+ return loadJobs(stmt, conn);
+
+ } catch (SQLException ex) {
+ throw new SqoopException(DerbyRepoError.DERBYREPO_0031, ex);
+ } finally {
+ closeStatements(stmt);
+ }
+ }
+
private List<MConnection> loadConnections(PreparedStatement stmt,
Connection conn)
throws SQLException {
@@ -572,8 +763,6 @@ public class DerbyRepositoryHandler implements JdbcRepositoryHandler {
formFrameworkFetchStmt =
conn.prepareStatement(STMT_FETCH_FORM_FRAMEWORK);
- LOG.error(STMT_FETCH_CONNECTION_INPUT);
-
inputFetchStmt = conn.prepareStatement(STMT_FETCH_CONNECTION_INPUT);
//inputFetchStmt.setLong(1, XXX); // Will be filled by loadForms
inputFetchStmt.setLong(2, id);
@@ -609,6 +798,69 @@ public class DerbyRepositoryHandler implements JdbcRepositoryHandler {
return connections;
}
+ private List<MJob> loadJobs(PreparedStatement stmt,
+ Connection conn)
+ throws SQLException {
+ List<MJob> jobs = new ArrayList<MJob>();
+ ResultSet rsJob = null;
+ PreparedStatement formConnectorFetchStmt = null;
+ PreparedStatement formFrameworkFetchStmt = null;
+ PreparedStatement inputFetchStmt = null;
+
+ try {
+ rsJob = stmt.executeQuery();
+
+ while(rsJob.next()) {
+ long connectorId = rsJob.getLong(1);
+ long id = rsJob.getLong(2);
+ String name = rsJob.getString(3);
+ long connectionId = rsJob.getLong(4);
+ String stringType = rsJob.getString(5);
+
+ MJob.Type type = MJob.Type.valueOf(stringType);
+
+ formConnectorFetchStmt =
+ conn.prepareStatement(STMT_FETCH_FORM_CONNECTOR);
+ formConnectorFetchStmt.setLong(1, connectorId);
+
+ formFrameworkFetchStmt =
+ conn.prepareStatement(STMT_FETCH_FORM_FRAMEWORK);
+
+ inputFetchStmt = conn.prepareStatement(STMT_FETCH_JOB_INPUT);
+ //inputFetchStmt.setLong(1, XXX); // Will be filled by loadForms
+ inputFetchStmt.setLong(2, id);
+
+ List<MForm> connectorConnForms = new ArrayList<MForm>();
+ List<MForm> frameworkConnForms = new ArrayList<MForm>();
+
+ Map<MJob.Type, List<MForm>> connectorJobForms
+ = new HashMap<MJob.Type, List<MForm>>();
+ Map<MJob.Type, List<MForm>> frameworkJobForms
+ = new HashMap<MJob.Type, List<MForm>>();
+
+ loadForms(connectorConnForms, connectorJobForms,
+ formConnectorFetchStmt, inputFetchStmt);
+ loadForms(frameworkConnForms, frameworkJobForms,
+ formFrameworkFetchStmt, inputFetchStmt);
+
+ MJob job = new MJob(connectorId, connectionId, type,
+ new MJobForms(type, connectorJobForms.get(type)),
+ new MJobForms(type, frameworkJobForms.get(type)));
+
+ job.setPersistenceId(id);
+ job.setName(name);
+
+ jobs.add(job);
+ }
+ } finally {
+ closeResultSets(rsJob);
+ closeStatements(formConnectorFetchStmt,
+ formFrameworkFetchStmt, inputFetchStmt);
+ }
+
+ return jobs;
+ }
+
/**
* Register forms in derby database.
*
@@ -644,7 +896,7 @@ public class DerbyRepositoryHandler implements JdbcRepositoryHandler {
int baseFormCount = baseFormStmt.executeUpdate();
if (baseFormCount != 1) {
throw new SqoopException(DerbyRepoError.DERBYREPO_0015,
- new Integer(baseFormCount).toString());
+ Integer.toString(baseFormCount));
}
ResultSet rsetFormId = baseFormStmt.getGeneratedKeys();
if (!rsetFormId.next()) {
@@ -686,7 +938,7 @@ public class DerbyRepositoryHandler implements JdbcRepositoryHandler {
int baseInputCount = baseInputStmt.executeUpdate();
if (baseInputCount != 1) {
throw new SqoopException(DerbyRepoError.DERBYREPO_0017,
- new Integer(baseInputCount).toString());
+ Integer.toString(baseInputCount));
}
ResultSet rsetInputId = baseInputStmt.getGeneratedKeys();
@@ -875,14 +1127,15 @@ public class DerbyRepositoryHandler implements JdbcRepositoryHandler {
return ret;
}
- private void createConnectionInputs(long connectionId,
- List<MForm> forms,
- Connection conn) throws SQLException {
+ private void createInputValues(String query,
+ long id,
+ List<MForm> forms,
+ Connection conn) throws SQLException {
PreparedStatement stmt = null;
int result;
try {
- stmt = conn.prepareStatement(STMT_INSERT_CONNECTION_INPUT);
+ stmt = conn.prepareStatement(query);
for (MForm form : forms) {
for (MInput input : form.getInputs()) {
@@ -890,7 +1143,7 @@ public class DerbyRepositoryHandler implements JdbcRepositoryHandler {
if (input.isEmpty()) {
continue;
}
- stmt.setLong(1, connectionId);
+ stmt.setLong(1, id);
stmt.setLong(2, input.getPersistenceId());
stmt.setString(3, input.getUrlSafeValueString());
http://git-wip-us.apache.org/repos/asf/sqoop/blob/866d46df/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaConstants.java
----------------------------------------------------------------------
diff --git a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaConstants.java b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaConstants.java
index 5253219..95461c9 100644
--- a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaConstants.java
+++ b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaConstants.java
@@ -100,6 +100,8 @@ public final class DerbySchemaConstants {
public static final String COLUMN_SQB_NAME = "SQB_NAME";
+ public static final String COLUMN_SQB_TYPE = "SQB_TYPE";
+
public static final String COLUMN_SQB_CONNECTION = "SQB_CONNECTION";
// SQ_CONNECTION_INPUT
http://git-wip-us.apache.org/repos/asf/sqoop/blob/866d46df/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaQuery.java
----------------------------------------------------------------------
diff --git a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaQuery.java b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaQuery.java
index 3cfc7f3..90f7a3d 100644
--- a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaQuery.java
+++ b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaQuery.java
@@ -86,6 +86,7 @@ import static org.apache.sqoop.repository.derby.DerbySchemaConstants.*;
* +----------------------------+
* | SQB_ID: BIGINT PK AUTO-GEN |
* | SQB_NAME: VARCHAR(64) |
+ * | SQB_TYPE: VARCHAR(64) |
* | SQB_CONNECTION: BIGINT | FK SQ_CONNECTION(SQN_ID)
* +----------------------------+
* </pre>
@@ -162,8 +163,9 @@ public final class DerbySchemaQuery {
"CREATE TABLE " + TABLE_SQ_JOB + " (" + COLUMN_SQB_ID
+ " BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1) "
+ "PRIMARY KEY, " + COLUMN_SQB_CONNECTION + " BIGINT, " + COLUMN_SQB_NAME
- + " VARCHAR(32), FOREIGN KEY(" + COLUMN_SQB_CONNECTION + ") REFERENCES "
- + TABLE_SQ_CONNECTION + " (" + COLUMN_SQN_ID + "))";
+ + " VARCHAR(64), " + COLUMN_SQB_TYPE + " VARCHAR(64), FOREIGN KEY("
+ + COLUMN_SQB_CONNECTION + ") REFERENCES " + TABLE_SQ_CONNECTION + " ("
+ + COLUMN_SQN_ID + "))";
// DDL: Create table SQ_CONNECTION_INPUT
public static final String QUERY_CREATE_TABLE_SQ_CONNECTION_INPUT =
@@ -226,6 +228,18 @@ public final class DerbySchemaQuery {
+ " = ? OR " + COLUMN_SQNI_CONNECTION + " IS NULL) ORDER BY "
+ COLUMN_SQI_INDEX;
+ // DML: Fetch inputs and values for a given job
+ public static final String STMT_FETCH_JOB_INPUT =
+ "SELECT " + COLUMN_SQI_ID + ", " + COLUMN_SQI_NAME + ", "
+ + COLUMN_SQI_FORM + ", " + COLUMN_SQI_INDEX + ", " + COLUMN_SQI_TYPE
+ + ", " + COLUMN_SQI_STRMASK + ", " + COLUMN_SQI_STRLENGTH
+ + ", " + COLUMN_SQBI_VALUE + " FROM " + TABLE_SQ_INPUT
+ + " LEFT OUTER JOIN " + TABLE_SQ_JOB_INPUT + " ON "
+ + COLUMN_SQBI_INPUT + " = " + COLUMN_SQI_ID + " WHERE "
+ + COLUMN_SQI_FORM + " = ? AND (" + COLUMN_SQBI_JOB
+ + " = ? OR " + COLUMN_SQBI_JOB + " IS NULL) ORDER BY "
+ + COLUMN_SQI_INDEX;
+
// DML: Insert connector base
public static final String STMT_INSERT_CONNECTOR_BASE =
"INSERT INTO " + TABLE_SQ_CONNECTOR + " (" + COLUMN_SQC_NAME
@@ -270,26 +284,54 @@ public final class DerbySchemaQuery {
+ COLUMN_SQN_CONNECTOR + " FROM " + TABLE_SQ_CONNECTION + " WHERE "
+ COLUMN_SQN_ID + " = ?";
- // DML: Select one specific connection
+ // DML: Select all connections
public static final String STMT_SELECT_CONNECTION_ALL =
"SELECT " + COLUMN_SQN_ID + ", " + COLUMN_SQN_NAME + ", "
+ COLUMN_SQN_CONNECTOR + " FROM " + TABLE_SQ_CONNECTION;
- // DML: Select all inputs for given connection
- public static final String STMT_SELECT_CONNECTION_INPUT =
- "SELECT " + COLUMN_SQF_ID + ", " + COLUMN_SQI_ID + ", " + COLUMN_SQNI_VALUE
- + " FROM " + TABLE_SQ_CONNECTION_INPUT + " JOIN " + TABLE_SQ_INPUT
- + " JOIN " + TABLE_SQ_FORM + " ON " + COLUMN_SQF_ID + " = "
- + COLUMN_SQI_FORM + " ON " + COLUMN_SQI_ID + " = " + COLUMN_SQNI_INPUT
- + " WHERE " + COLUMN_SQNI_CONNECTION + " = ? ORDER BY "
- + COLUMN_SQF_CONNECTOR + ", " + COLUMN_SQF_INDEX
- + ", " + COLUMN_SQI_INDEX ;
-
// DML: Check if given connection exists
public static final String STMT_SELECT_CONNECTION_CHECK =
"SELECT count(*) FROM " + TABLE_SQ_CONNECTION + " WHERE " + COLUMN_SQN_ID
+ " = ?";
+ // DML: Insert new job
+ public static final String STMT_INSERT_JOB =
+ "INSERT INTO " + TABLE_SQ_JOB + " (" + COLUMN_SQB_NAME + ", "
+ + COLUMN_SQB_CONNECTION + ", " + COLUMN_SQB_TYPE + ") VALUES (?, ?, ?)";
+
+ // DML: Insert new job inputs
+ public static final String STMT_INSERT_JOB_INPUT =
+ "INSERT INTO " + TABLE_SQ_JOB_INPUT + " (" + COLUMN_SQBI_JOB
+ + ", " + COLUMN_SQBI_INPUT + ", " + COLUMN_SQBI_VALUE + ") "
+ + "VALUES (?, ?, ?)";
+
+ // DML: Delete rows from job input table
+ public static final String STMT_DELETE_JOB_INPUT =
+ "DELETE FROM " + TABLE_SQ_JOB_INPUT + " WHERE " + COLUMN_SQBI_JOB + " = ?";
+
+ // DML: Delete row from job table
+ public static final String STMT_DELETE_JOB =
+ "DELETE FROM " + TABLE_SQ_JOB + " WHERE " + COLUMN_SQB_ID + " = ?";
+
+ // DML: Check if given job exists
+ public static final String STMT_SELECT_JOB_CHECK =
+ "SELECT count(*) FROM " + TABLE_SQ_JOB + " WHERE " + COLUMN_SQB_ID + " = ?";
+
+ // DML: Select one specific job
+ public static final String STMT_SELECT_JOB_SINGLE =
+ "SELECT " + COLUMN_SQN_CONNECTOR + ", " + COLUMN_SQB_ID + ", "
+ + COLUMN_SQB_NAME + ", " + COLUMN_SQB_CONNECTION + ", " + COLUMN_SQB_TYPE
+ + " FROM " + TABLE_SQ_JOB + " LEFT JOIN " + TABLE_SQ_CONNECTION + " ON "
+ + COLUMN_SQB_CONNECTION + " = " + COLUMN_SQN_ID + " WHERE "
+ + COLUMN_SQB_ID + " = ?";
+
+ // DML: Select all jobs
+ public static final String STMT_SELECT_JOB_ALL =
+ "SELECT " + COLUMN_SQN_CONNECTOR + ", " + COLUMN_SQB_ID + ", "
+ + COLUMN_SQB_NAME + ", " + COLUMN_SQB_CONNECTION + ", " + COLUMN_SQB_TYPE
+ + " FROM " + TABLE_SQ_JOB + " LEFT JOIN " + TABLE_SQ_CONNECTION + " ON "
+ + COLUMN_SQB_CONNECTION + " = " + COLUMN_SQN_ID;
+
private DerbySchemaQuery() {
// Disable explicit object creation
}