You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by bl...@apache.org on 2012/11/02 22:58:28 UTC
[4/4] SQOOP-656 End to end submission engine (Jarek Jarcec Cecho)
http://git-wip-us.apache.org/repos/asf/sqoop/blob/1cd3c373/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestSubmissionHandling.java
----------------------------------------------------------------------
diff --git a/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestSubmissionHandling.java b/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestSubmissionHandling.java
new file mode 100644
index 0000000..3433b20
--- /dev/null
+++ b/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestSubmissionHandling.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.repository.derby;
+
+import org.apache.sqoop.model.MSubmission;
+import org.apache.sqoop.submission.SubmissionStatus;
+
+import java.util.Calendar;
+import java.util.Date;
+import java.util.List;
+
+/**
+ *
+ */
+public class TestSubmissionHandling extends DerbyTestCase {
+
+ DerbyRepositoryHandler handler;
+
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+
+ handler = new DerbyRepositoryHandler();
+
+ // We always needs schema for this test case
+ createSchema();
+
+ // We always needs connector and framework structures in place
+ loadConnectorAndFramework();
+
+ // We also always needs connection metadata in place
+ loadConnections();
+
+ // And finally we always needs job metadata in place
+ loadJobs();
+ }
+
+ public void testFindSubmissionsUnfinished() throws Exception {
+ List<MSubmission> submissions;
+
+ submissions = handler.findSubmissionsUnfinished(getDerbyConnection());
+ assertNotNull(submissions);
+ assertEquals(0, submissions.size());
+
+ loadSubmissions();
+
+ submissions = handler.findSubmissionsUnfinished(getDerbyConnection());
+ assertNotNull(submissions);
+ assertEquals(2, submissions.size());
+ }
+
+ public void testExistsSubmission() throws Exception {
+ // There shouldn't be anything on empty repository
+ assertFalse(handler.existsSubmission(1, getDerbyConnection()));
+ assertFalse(handler.existsSubmission(2, getDerbyConnection()));
+ assertFalse(handler.existsSubmission(3, getDerbyConnection()));
+ assertFalse(handler.existsSubmission(4, getDerbyConnection()));
+ assertFalse(handler.existsSubmission(5, getDerbyConnection()));
+ assertFalse(handler.existsSubmission(6, getDerbyConnection()));
+
+ loadSubmissions();
+
+ assertTrue(handler.existsSubmission(1, getDerbyConnection()));
+ assertTrue(handler.existsSubmission(2, getDerbyConnection()));
+ assertTrue(handler.existsSubmission(3, getDerbyConnection()));
+ assertTrue(handler.existsSubmission(4, getDerbyConnection()));
+ assertTrue(handler.existsSubmission(5, getDerbyConnection()));
+ assertFalse(handler.existsSubmission(6, getDerbyConnection()));
+ }
+
+ public void testCreateSubmission() throws Exception {
+ MSubmission submission =
+ new MSubmission(1, new Date(), SubmissionStatus.RUNNING, "job-x");
+
+ handler.createSubmission(submission, getDerbyConnection());
+
+ assertEquals(1, submission.getPersistenceId());
+ assertCountForTable("SQOOP.SQ_SUBMISSION", 1);
+
+ List<MSubmission> submissions =
+ handler.findSubmissionsUnfinished(getDerbyConnection());
+ assertNotNull(submissions);
+ assertEquals(1, submissions.size());
+
+ submission = submissions.get(0);
+
+ assertEquals(1, submission.getJobId());
+ assertEquals(SubmissionStatus.RUNNING, submission.getStatus());
+ assertEquals("job-x", submission.getExternalId());
+
+ // Let's create second connection
+ submission =
+ new MSubmission(1, new Date(), SubmissionStatus.SUCCEEDED, "job-x");
+ handler.createSubmission(submission, getDerbyConnection());
+
+ assertEquals(2, submission.getPersistenceId());
+ assertCountForTable("SQOOP.SQ_SUBMISSION", 2);
+ }
+
+ public void testUpdateConnection() throws Exception {
+ loadSubmissions();
+
+ List<MSubmission> submissions =
+ handler.findSubmissionsUnfinished(getDerbyConnection());
+ assertNotNull(submissions);
+ assertEquals(2, submissions.size());
+
+ MSubmission submission = submissions.get(0);
+ submission.setStatus(SubmissionStatus.SUCCEEDED);
+
+ handler.updateSubmission(submission, getDerbyConnection());
+
+ submissions = handler.findSubmissionsUnfinished(getDerbyConnection());
+ assertNotNull(submissions);
+ assertEquals(1, submissions.size());
+ }
+
+ public void testPurgeSubmissions() throws Exception {
+ loadSubmissions();
+ List<MSubmission> submissions;
+
+ submissions = handler.findSubmissionsUnfinished(getDerbyConnection());
+ assertNotNull(submissions);
+ assertEquals(2, submissions.size());
+ assertCountForTable("SQOOP.SQ_SUBMISSION", 5);
+
+ Calendar calendar = Calendar.getInstance();
+ // 2012-01-03 05:05:05
+ calendar.set(2012, Calendar.JANUARY, 3, 5, 5, 5);
+ handler.purgeSubmissions(calendar.getTime(), getDerbyConnection());
+
+ submissions = handler.findSubmissionsUnfinished(getDerbyConnection());
+ assertNotNull(submissions);
+ assertEquals(1, submissions.size());
+ assertCountForTable("SQOOP.SQ_SUBMISSION", 2);
+
+ handler.purgeSubmissions(new Date(), getDerbyConnection());
+
+ submissions = handler.findSubmissionsUnfinished(getDerbyConnection());
+ assertNotNull(submissions);
+ assertEquals(0, submissions.size());
+ assertCountForTable("SQOOP.SQ_SUBMISSION", 0);
+
+ handler.purgeSubmissions(new Date(), getDerbyConnection());
+
+ submissions = handler.findSubmissionsUnfinished(getDerbyConnection());
+ assertNotNull(submissions);
+ assertEquals(0, submissions.size());
+ assertCountForTable("SQOOP.SQ_SUBMISSION", 0);
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/1cd3c373/server/pom.xml
----------------------------------------------------------------------
diff --git a/server/pom.xml b/server/pom.xml
index 78ad8ee..71aa6c9 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -46,6 +46,12 @@ limitations under the License.
</dependency>
<dependency>
+ <groupId>org.apache.sqoop.submission</groupId>
+ <artifactId>sqoop-submission-mapreduce</artifactId>
+ <version>2.0.0-SNAPSHOT</version>
+ </dependency>
+
+ <dependency>
<groupId>org.apache.sqoop.repository</groupId>
<artifactId>sqoop-repository-derby</artifactId>
<version>2.0.0-SNAPSHOT</version>
http://git-wip-us.apache.org/repos/asf/sqoop/blob/1cd3c373/server/src/main/java/org/apache/sqoop/handler/ConnectionRequestHandler.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/sqoop/handler/ConnectionRequestHandler.java b/server/src/main/java/org/apache/sqoop/handler/ConnectionRequestHandler.java
index eba334e..64ef84a 100644
--- a/server/src/main/java/org/apache/sqoop/handler/ConnectionRequestHandler.java
+++ b/server/src/main/java/org/apache/sqoop/handler/ConnectionRequestHandler.java
@@ -33,7 +33,7 @@ import org.apache.sqoop.repository.RepositoryManager;
import org.apache.sqoop.server.RequestContext;
import org.apache.sqoop.server.RequestHandler;
import org.apache.sqoop.server.common.ServerError;
-import org.apache.sqoop.utils.ClassLoadingUtils;
+import org.apache.sqoop.utils.ClassUtils;
import org.apache.sqoop.validation.Status;
import org.apache.sqoop.validation.Validation;
import org.apache.sqoop.validation.Validator;
@@ -158,9 +158,9 @@ public class ConnectionRequestHandler implements RequestHandler {
Validator frameworkValidator = FrameworkManager.getValidator();
// We need translate forms to configuration objects
- Object connectorConfig = ClassLoadingUtils.instantiate(
+ Object connectorConfig = ClassUtils.instantiate(
connector.getConnectionConfigurationClass());
- Object frameworkConfig = ClassLoadingUtils.instantiate(
+ Object frameworkConfig = ClassUtils.instantiate(
FrameworkManager.getConnectionConfigurationClass());
FormUtils.fillValues(
http://git-wip-us.apache.org/repos/asf/sqoop/blob/1cd3c373/server/src/main/java/org/apache/sqoop/handler/ConnectorRequestHandler.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/sqoop/handler/ConnectorRequestHandler.java b/server/src/main/java/org/apache/sqoop/handler/ConnectorRequestHandler.java
index fda91fd..8a52243 100644
--- a/server/src/main/java/org/apache/sqoop/handler/ConnectorRequestHandler.java
+++ b/server/src/main/java/org/apache/sqoop/handler/ConnectorRequestHandler.java
@@ -60,7 +60,7 @@ public class ConnectorRequestHandler implements RequestHandler {
Long id = Long.parseLong(cid);
// Check that user is not asking for non existing connector id
- if(!ConnectorManager.getConnectoIds().contains(id)) {
+ if(!ConnectorManager.getConnectorIds().contains(id)) {
throw new SqoopException(ServerError.SERVER_0004, "Invalid id " + id);
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/1cd3c373/server/src/main/java/org/apache/sqoop/handler/JobRequestHandler.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/sqoop/handler/JobRequestHandler.java b/server/src/main/java/org/apache/sqoop/handler/JobRequestHandler.java
index 0589e30..070b290 100644
--- a/server/src/main/java/org/apache/sqoop/handler/JobRequestHandler.java
+++ b/server/src/main/java/org/apache/sqoop/handler/JobRequestHandler.java
@@ -33,7 +33,7 @@ import org.apache.sqoop.repository.RepositoryManager;
import org.apache.sqoop.server.RequestContext;
import org.apache.sqoop.server.RequestHandler;
import org.apache.sqoop.server.common.ServerError;
-import org.apache.sqoop.utils.ClassLoadingUtils;
+import org.apache.sqoop.utils.ClassUtils;
import org.apache.sqoop.validation.Status;
import org.apache.sqoop.validation.Validation;
import org.apache.sqoop.validation.Validator;
@@ -159,10 +159,10 @@ public class JobRequestHandler implements RequestHandler {
Validator frameworkValidator = FrameworkManager.getValidator();
// We need translate forms to configuration objects
- Object connectorConfig = ClassLoadingUtils.instantiate(
- connector.getConnectionConfigurationClass());
- Object frameworkConfig = ClassLoadingUtils.instantiate(
- FrameworkManager.getConnectionConfigurationClass());
+ Object connectorConfig = ClassUtils.instantiate(
+ connector.getJobConfigurationClass(job.getType()));
+ Object frameworkConfig = ClassUtils.instantiate(
+ FrameworkManager.getJobConfigurationClass(job.getType()));
FormUtils.fillValues(job.getConnectorPart().getForms(), connectorConfig);
FormUtils.fillValues(job.getFrameworkPart().getForms(), frameworkConfig);
http://git-wip-us.apache.org/repos/asf/sqoop/blob/1cd3c373/server/src/main/java/org/apache/sqoop/handler/SubmissionRequestHandler.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/sqoop/handler/SubmissionRequestHandler.java b/server/src/main/java/org/apache/sqoop/handler/SubmissionRequestHandler.java
new file mode 100644
index 0000000..e9e6551
--- /dev/null
+++ b/server/src/main/java/org/apache/sqoop/handler/SubmissionRequestHandler.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.handler;
+
+import org.apache.log4j.Logger;
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.framework.FrameworkManager;
+import org.apache.sqoop.json.JsonBean;
+import org.apache.sqoop.json.SubmissionBean;
+import org.apache.sqoop.model.MSubmission;
+import org.apache.sqoop.server.RequestContext;
+import org.apache.sqoop.server.RequestHandler;
+import org.apache.sqoop.server.common.ServerError;
+
+/**
+ * Submission request handler is supporting following resources:
+ *
+ * GET /v1/submission/action/:jid
+ * Get status of last submission for job with id :jid
+ *
+ * POST /v1/submission/action/:jid
+ * Create new submission for job with id :jid
+ *
+ * DELETE /v1/submission/action/:jid
+ * Stop last submission for job with id :jid
+ *
+ * Possible additions in the future: /v1/submission/history/* for history.
+ */
+public class SubmissionRequestHandler implements RequestHandler {
+
+ private final Logger logger = Logger.getLogger(getClass());
+
+ public SubmissionRequestHandler() {
+ logger.info("SubmissionRequestHandler initialized");
+ }
+
+ @Override
+ public JsonBean handleEvent(RequestContext ctx) {
+ String[] urlElements = ctx.getUrlElements();
+ if (urlElements.length < 2) {
+ throw new SqoopException(ServerError.SERVER_0003,
+ "Invalid URL, too few arguments for this servlet.");
+ }
+
+ // Let's check
+ int length = urlElements.length;
+ String action = urlElements[length - 2];
+
+ if(action.equals("action")) {
+ return handleActionEvent(ctx, urlElements[length - 1]);
+ }
+
+ throw new SqoopException(ServerError.SERVER_0003,
+ "Do not know what to do.");
+ }
+
+ private JsonBean handleActionEvent(RequestContext ctx, String sjid) {
+ long jid = Long.parseLong(sjid);
+
+ switch (ctx.getMethod()) {
+ case GET:
+ return submissionStatus(jid);
+ case POST:
+ return submissionSubmit(jid);
+ case DELETE:
+ return submissionStop(jid);
+ }
+
+ return null;
+ }
+
+ private JsonBean submissionStop(long jid) {
+ MSubmission submission = FrameworkManager.stop(jid);
+ return new SubmissionBean(submission);
+ }
+
+ private JsonBean submissionSubmit(long jid) {
+ MSubmission submission = FrameworkManager.submit(jid);
+ return new SubmissionBean(submission);
+ }
+
+ private JsonBean submissionStatus(long jid) {
+ MSubmission submission = FrameworkManager.status(jid);
+ return new SubmissionBean(submission);
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/1cd3c373/server/src/main/java/org/apache/sqoop/server/RequestContext.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/sqoop/server/RequestContext.java b/server/src/main/java/org/apache/sqoop/server/RequestContext.java
index 78950f6..c6b6569 100644
--- a/server/src/main/java/org/apache/sqoop/server/RequestContext.java
+++ b/server/src/main/java/org/apache/sqoop/server/RequestContext.java
@@ -85,6 +85,13 @@ public class RequestContext {
}
/**
+ * Return all elements in the url as an array
+ */
+ public String[] getUrlElements() {
+ return getRequest().getRequestURI().split("/");
+ }
+
+ /**
* Get locale specified in accept-language HTTP header.
*
* @return First specified locale
http://git-wip-us.apache.org/repos/asf/sqoop/blob/1cd3c373/server/src/main/java/org/apache/sqoop/server/ServerInitializer.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/sqoop/server/ServerInitializer.java b/server/src/main/java/org/apache/sqoop/server/ServerInitializer.java
index 993c153..ae0735b 100644
--- a/server/src/main/java/org/apache/sqoop/server/ServerInitializer.java
+++ b/server/src/main/java/org/apache/sqoop/server/ServerInitializer.java
@@ -37,18 +37,22 @@ public class ServerInitializer implements ServletContextListener {
Logger.getLogger(ServerInitializer.class);
public void contextDestroyed(ServletContextEvent arg0) {
+ LOG.info("Shutting down Sqoop server");
FrameworkManager.destroy();
ConnectorManager.destroy();
RepositoryManager.destroy();
SqoopConfiguration.destroy();
+ LOG.info("Sqoop server has been correctly terminated");
}
public void contextInitialized(ServletContextEvent arg0) {
try {
+ LOG.info("Booting up Sqoop server");
SqoopConfiguration.initialize();
RepositoryManager.initialize();
ConnectorManager.initialize();
FrameworkManager.initialize();
+ LOG.info("Sqoop server has successfully boot up");
} catch (RuntimeException ex) {
LOG.error("Server startup failure", ex);
throw ex;
http://git-wip-us.apache.org/repos/asf/sqoop/blob/1cd3c373/server/src/main/java/org/apache/sqoop/server/v1/SubmissionServlet.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/sqoop/server/v1/SubmissionServlet.java b/server/src/main/java/org/apache/sqoop/server/v1/SubmissionServlet.java
new file mode 100644
index 0000000..7252e11
--- /dev/null
+++ b/server/src/main/java/org/apache/sqoop/server/v1/SubmissionServlet.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.server.v1;
+
+import org.apache.sqoop.handler.SubmissionRequestHandler;
+import org.apache.sqoop.json.JsonBean;
+import org.apache.sqoop.server.RequestContext;
+import org.apache.sqoop.server.RequestHandler;
+import org.apache.sqoop.server.SqoopProtocolServlet;
+
+/**
+ *
+ */
+public class SubmissionServlet extends SqoopProtocolServlet {
+
+ private RequestHandler requestHandler;
+
+ public SubmissionServlet() {
+ requestHandler = new SubmissionRequestHandler();
+ }
+
+ @Override
+ protected JsonBean handleGetRequest(RequestContext ctx) throws Exception {
+ return requestHandler.handleEvent(ctx);
+ }
+
+ @Override
+ protected JsonBean handlePostRequest(RequestContext ctx) throws Exception {
+ return requestHandler.handleEvent(ctx);
+ }
+
+ @Override
+ protected JsonBean handleDeleteRequest(RequestContext ctx) throws Exception {
+ return requestHandler.handleEvent(ctx);
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/1cd3c373/server/src/main/webapp/WEB-INF/web.xml
----------------------------------------------------------------------
diff --git a/server/src/main/webapp/WEB-INF/web.xml b/server/src/main/webapp/WEB-INF/web.xml
index 69229bf..f053062 100644
--- a/server/src/main/webapp/WEB-INF/web.xml
+++ b/server/src/main/webapp/WEB-INF/web.xml
@@ -87,5 +87,18 @@ limitations under the License.
<url-pattern>/v1/job/*</url-pattern>
</servlet-mapping>
+ <!-- Submission servlet -->
+ <servlet>
+ <servlet-name>v1.SubmissionServlet</servlet-name>
+ <servlet-class>org.apache.sqoop.server.v1.SubmissionServlet</servlet-class>
+ <load-on-startup>1</load-on-startup>
+ </servlet>
+
+ <servlet-mapping>
+ <servlet-name>v1.SubmissionServlet</servlet-name>
+ <url-pattern>/v1/submission/*</url-pattern>
+ </servlet-mapping>
+
+
</web-app>
http://git-wip-us.apache.org/repos/asf/sqoop/blob/1cd3c373/spi/src/main/java/org/apache/sqoop/job/etl/CallbackBase.java
----------------------------------------------------------------------
diff --git a/spi/src/main/java/org/apache/sqoop/job/etl/CallbackBase.java b/spi/src/main/java/org/apache/sqoop/job/etl/CallbackBase.java
new file mode 100644
index 0000000..59a9457
--- /dev/null
+++ b/spi/src/main/java/org/apache/sqoop/job/etl/CallbackBase.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.job.etl;
+
+/**
+ * Set of default callbacks that must be implement by each job type.
+ */
+public abstract class CallbackBase {
+
+ private Class<? extends Initializer> initializer;
+ private Class<? extends Destroyer> destroyer;
+
+ public CallbackBase(
+ Class<? extends Initializer> initializer,
+ Class<? extends Destroyer> destroyer
+ ) {
+ this.initializer = initializer;
+ this.destroyer = destroyer;
+ }
+
+ public Class<? extends Destroyer> getDestroyer() {
+ return destroyer;
+ }
+
+ public Class<? extends Initializer> getInitializer() {
+ return initializer;
+ }
+
+ @Override
+ public String toString() {
+ return "initializer=" + initializer.getName() +
+ ", destroyer=" + destroyer.getName();
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/1cd3c373/spi/src/main/java/org/apache/sqoop/job/etl/Context.java
----------------------------------------------------------------------
diff --git a/spi/src/main/java/org/apache/sqoop/job/etl/Context.java b/spi/src/main/java/org/apache/sqoop/job/etl/Context.java
deleted file mode 100644
index fc01c96..0000000
--- a/spi/src/main/java/org/apache/sqoop/job/etl/Context.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.job.etl;
-
-/**
- * The context for getting configuration values.
- */
-public interface Context {
-
- String getString(String key);
-
-}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/1cd3c373/spi/src/main/java/org/apache/sqoop/job/etl/Destroyer.java
----------------------------------------------------------------------
diff --git a/spi/src/main/java/org/apache/sqoop/job/etl/Destroyer.java b/spi/src/main/java/org/apache/sqoop/job/etl/Destroyer.java
index af766f3..37b9f1b 100644
--- a/spi/src/main/java/org/apache/sqoop/job/etl/Destroyer.java
+++ b/spi/src/main/java/org/apache/sqoop/job/etl/Destroyer.java
@@ -17,12 +17,15 @@
*/
package org.apache.sqoop.job.etl;
+import org.apache.sqoop.common.MapContext;
+
/**
* This allows connector to define work to complete execution, for example,
* resource cleaning.
*/
public abstract class Destroyer {
- public abstract void run(Context context);
+ // TODO(Jarcec): This should be called with ImmutableContext
+ public abstract void run(MapContext context);
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/1cd3c373/spi/src/main/java/org/apache/sqoop/job/etl/Exporter.java
----------------------------------------------------------------------
diff --git a/spi/src/main/java/org/apache/sqoop/job/etl/Exporter.java b/spi/src/main/java/org/apache/sqoop/job/etl/Exporter.java
index ef690bf..cdaa623 100644
--- a/spi/src/main/java/org/apache/sqoop/job/etl/Exporter.java
+++ b/spi/src/main/java/org/apache/sqoop/job/etl/Exporter.java
@@ -25,32 +25,27 @@ package org.apache.sqoop.job.etl;
* -> Loader
* -> Destroyer
*/
-public class Exporter {
+public class Exporter extends CallbackBase {
- private Class<? extends Initializer> initializer;
private Class<? extends Loader> loader;
- private Class<? extends Destroyer> destroyer;
public Exporter(
Class<? extends Initializer> initializer,
Class<? extends Loader> loader,
Class<? extends Destroyer> destroyer
) {
- this.initializer = initializer;
+ super(initializer, destroyer);
this.loader = loader;
- this.destroyer = destroyer;
- }
-
- public Class<? extends Initializer> getInitializer() {
- return initializer;
}
public Class<? extends Loader> getLoader() {
return loader;
}
- public Class<? extends Destroyer> getDestroyer() {
- return destroyer;
+ @Override
+ public String toString() {
+ return "Exporter{" + super.toString() +
+ ", loader=" + loader +
+ '}';
}
-
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/1cd3c373/spi/src/main/java/org/apache/sqoop/job/etl/Extractor.java
----------------------------------------------------------------------
diff --git a/spi/src/main/java/org/apache/sqoop/job/etl/Extractor.java b/spi/src/main/java/org/apache/sqoop/job/etl/Extractor.java
index 20bdeda..ba04be9 100644
--- a/spi/src/main/java/org/apache/sqoop/job/etl/Extractor.java
+++ b/spi/src/main/java/org/apache/sqoop/job/etl/Extractor.java
@@ -17,6 +17,7 @@
*/
package org.apache.sqoop.job.etl;
+import org.apache.sqoop.common.ImmutableContext;
import org.apache.sqoop.job.io.DataWriter;
/**
@@ -25,7 +26,10 @@ import org.apache.sqoop.job.io.DataWriter;
*/
public abstract class Extractor {
- public abstract void run(Context context,
- Partition partition, DataWriter writer);
+ public abstract void run(ImmutableContext context,
+ Object connectionConfiguration,
+ Object jobConfiguration,
+ Partition partition,
+ DataWriter writer);
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/1cd3c373/spi/src/main/java/org/apache/sqoop/job/etl/Importer.java
----------------------------------------------------------------------
diff --git a/spi/src/main/java/org/apache/sqoop/job/etl/Importer.java b/spi/src/main/java/org/apache/sqoop/job/etl/Importer.java
index f0a8d1a..d4c9e70 100644
--- a/spi/src/main/java/org/apache/sqoop/job/etl/Importer.java
+++ b/spi/src/main/java/org/apache/sqoop/job/etl/Importer.java
@@ -26,25 +26,18 @@ package org.apache.sqoop.job.etl;
* -> (framework-defined steps)
* -> Destroyer
*/
-public class Importer {
+public class Importer extends CallbackBase {
- private Class<? extends Initializer> initializer;
private Class<? extends Partitioner> partitioner;
private Class<? extends Extractor> extractor;
- private Class<? extends Destroyer> destroyer;
public Importer(Class<? extends Initializer> initializer,
Class<? extends Partitioner> partitioner,
Class<? extends Extractor> extractor,
Class<? extends Destroyer> destroyer) {
- this.initializer = initializer;
+ super(initializer, destroyer);
this.partitioner = partitioner;
this.extractor = extractor;
- this.destroyer = destroyer;
- }
-
- public Class<? extends Initializer> getInitializer() {
- return initializer;
}
public Class<? extends Partitioner> getPartitioner() {
@@ -55,8 +48,11 @@ public class Importer {
return extractor;
}
- public Class<? extends Destroyer> getDestroyer() {
- return destroyer;
+ @Override
+ public String toString() {
+ return "Importer{" + super.toString() +
+ ", partitioner=" + partitioner.getName() +
+ ", extractor=" + extractor.getName() +
+ '}';
}
-
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/1cd3c373/spi/src/main/java/org/apache/sqoop/job/etl/Initializer.java
----------------------------------------------------------------------
diff --git a/spi/src/main/java/org/apache/sqoop/job/etl/Initializer.java b/spi/src/main/java/org/apache/sqoop/job/etl/Initializer.java
index 75bd42e..2092815 100644
--- a/spi/src/main/java/org/apache/sqoop/job/etl/Initializer.java
+++ b/spi/src/main/java/org/apache/sqoop/job/etl/Initializer.java
@@ -17,12 +17,42 @@
*/
package org.apache.sqoop.job.etl;
+import org.apache.sqoop.common.MapContext;
+import org.apache.sqoop.common.MutableMapContext;
+
+import java.util.LinkedList;
+import java.util.List;
+
/**
* This allows connector to define initialization work for execution,
* for example, context configuration.
*/
public abstract class Initializer {
- public abstract void run(MutableContext context, Options options);
+ /**
+ * Initialize new submission based on given configuration properties. Any
+ * needed temporary values might be saved to context object and they will be
+ * promoted to all other part of the workflow automatically.
+ *
+ * @param context Changeable context object, purely for connector usage
+ * @param connectionConfiguration Connector's connection configuration object
+ * @param jobConfiguration Connector's job configuration object
+ */
+ public abstract void initialize(MutableMapContext context,
+ Object connectionConfiguration,
+ Object jobConfiguration);
+
+ /**
+ * Return list of all jars that this particular connector needs to operate
+ * on following job. This method will be called after running initialize
+ * method.
+ *
+ * @return
+ */
+ public List<String> getJars(MapContext context,
+ Object connectionConfiguration,
+ Object jobConfiguration) {
+ return new LinkedList<String>();
+ }
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/1cd3c373/spi/src/main/java/org/apache/sqoop/job/etl/Loader.java
----------------------------------------------------------------------
diff --git a/spi/src/main/java/org/apache/sqoop/job/etl/Loader.java b/spi/src/main/java/org/apache/sqoop/job/etl/Loader.java
index 5474927..3a708df 100644
--- a/spi/src/main/java/org/apache/sqoop/job/etl/Loader.java
+++ b/spi/src/main/java/org/apache/sqoop/job/etl/Loader.java
@@ -17,6 +17,7 @@
*/
package org.apache.sqoop.job.etl;
+import org.apache.sqoop.common.ImmutableContext;
import org.apache.sqoop.job.io.DataReader;
/**
@@ -24,6 +25,6 @@ import org.apache.sqoop.job.io.DataReader;
*/
public abstract class Loader {
- public abstract void run(Context context, DataReader reader);
+ public abstract void run(ImmutableContext context, DataReader reader);
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/1cd3c373/spi/src/main/java/org/apache/sqoop/job/etl/MutableContext.java
----------------------------------------------------------------------
diff --git a/spi/src/main/java/org/apache/sqoop/job/etl/MutableContext.java b/spi/src/main/java/org/apache/sqoop/job/etl/MutableContext.java
deleted file mode 100644
index 03678c5..0000000
--- a/spi/src/main/java/org/apache/sqoop/job/etl/MutableContext.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.job.etl;
-
-/**
- * The context for getting and setting configuration values.
- */
-public interface MutableContext extends Context {
-
- void setString(String key, String value);
-
-}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/1cd3c373/spi/src/main/java/org/apache/sqoop/job/etl/Options.java
----------------------------------------------------------------------
diff --git a/spi/src/main/java/org/apache/sqoop/job/etl/Options.java b/spi/src/main/java/org/apache/sqoop/job/etl/Options.java
deleted file mode 100644
index 2dc4671..0000000
--- a/spi/src/main/java/org/apache/sqoop/job/etl/Options.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.job.etl;
-
-/**
- * The options provided from user input.
- */
-public interface Options {
-
- public String getOption(String key);
-
-}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/1cd3c373/spi/src/main/java/org/apache/sqoop/job/etl/Partition.java
----------------------------------------------------------------------
diff --git a/spi/src/main/java/org/apache/sqoop/job/etl/Partition.java b/spi/src/main/java/org/apache/sqoop/job/etl/Partition.java
index 8834c80..db07844 100644
--- a/spi/src/main/java/org/apache/sqoop/job/etl/Partition.java
+++ b/spi/src/main/java/org/apache/sqoop/job/etl/Partition.java
@@ -36,4 +36,11 @@ public abstract class Partition {
*/
public abstract void write(DataOutput out) throws IOException;
+ /**
+ * Each partition must be easily serializable to human readable form so that
+ * it can be logged for debugging purpose.
+ *
+ * @return Human readable representation
+ */
+ public abstract String toString();
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/1cd3c373/spi/src/main/java/org/apache/sqoop/job/etl/Partitioner.java
----------------------------------------------------------------------
diff --git a/spi/src/main/java/org/apache/sqoop/job/etl/Partitioner.java b/spi/src/main/java/org/apache/sqoop/job/etl/Partitioner.java
index 21310be..3a525c4 100644
--- a/spi/src/main/java/org/apache/sqoop/job/etl/Partitioner.java
+++ b/spi/src/main/java/org/apache/sqoop/job/etl/Partitioner.java
@@ -17,6 +17,8 @@
*/
package org.apache.sqoop.job.etl;
+import org.apache.sqoop.common.ImmutableContext;
+
import java.util.List;
/**
@@ -25,6 +27,8 @@ import java.util.List;
*/
public abstract class Partitioner {
- public abstract List<Partition> run(Context context);
+ public abstract List<Partition> getPartitions(ImmutableContext context,
+ Object connectionConfiguration,
+ Object jobConfiguration);
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/1cd3c373/submission/mapreduce/pom.xml
----------------------------------------------------------------------
diff --git a/submission/mapreduce/pom.xml b/submission/mapreduce/pom.xml
new file mode 100644
index 0000000..03c06c0
--- /dev/null
+++ b/submission/mapreduce/pom.xml
@@ -0,0 +1,67 @@
+<?xml version="1.0"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.sqoop</groupId>
+ <artifactId>submission</artifactId>
+ <version>2.0.0-SNAPSHOT</version>
+ </parent>
+
+ <groupId>org.apache.sqoop.submission</groupId>
+ <artifactId>sqoop-submission-mapreduce</artifactId>
+ <version>2.0.0-SNAPSHOT</version>
+ <name>Sqoop Mapreduce Submission Engine</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.sqoop</groupId>
+ <artifactId>sqoop-core</artifactId>
+ <version>2.0.0-SNAPSHOT</version>
+ </dependency>
+
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.sqoop</groupId>
+ <artifactId>sqoop-core</artifactId>
+ <version>2.0.0-SNAPSHOT</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ </dependencies>
+
+</project>
http://git-wip-us.apache.org/repos/asf/sqoop/blob/1cd3c373/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/Constants.java
----------------------------------------------------------------------
diff --git a/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/Constants.java b/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/Constants.java
new file mode 100644
index 0000000..e562701
--- /dev/null
+++ b/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/Constants.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.submission.mapreduce;
+
+/**
+ * Configuration constants for Mapreduce submission engine
+ */
+public class Constants {
+
+ public static final String PREFIX_MAPREDUCE = "mapreduce.";
+
+ public static final String CONF_CONFIG_DIR =
+ PREFIX_MAPREDUCE + "configuration.directory";
+
+ private Constants() {
+ // Instantiation is prohibited
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/1cd3c373/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java
----------------------------------------------------------------------
diff --git a/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java b/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java
new file mode 100644
index 0000000..7049924
--- /dev/null
+++ b/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java
@@ -0,0 +1,311 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.submission.mapreduce;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobID;
+import org.apache.hadoop.mapred.JobStatus;
+import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.log4j.Logger;
+import org.apache.sqoop.common.MapContext;
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.framework.SubmissionRequest;
+import org.apache.sqoop.framework.SubmissionEngine;
+import org.apache.sqoop.job.JobConstants;
+import org.apache.sqoop.model.FormUtils;
+import org.apache.sqoop.submission.counter.Counters;
+import org.apache.sqoop.submission.SubmissionStatus;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.util.Map;
+
+
+/**
+ * This is very simple and straightforward implementation of map-reduce based
+ * submission engine.
+ */
+public class MapreduceSubmissionEngine extends SubmissionEngine {
+
+
+ private static Logger LOG = Logger.getLogger(MapreduceSubmissionEngine.class);
+
+ /**
+ * Global configuration object that is build from hadoop configuration files
+ * on engine initialization and cloned during each new submission creation.
+ */
+ private Configuration globalConfiguration;
+
+ /**
+ * Job client that is configured to talk to one specific Job tracker.
+ */
+ private JobClient jobClient;
+
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void initialize(MapContext context, String prefix) {
+ LOG.info("Initializing Map-reduce Submission Engine");
+
+ // Build global configuration, start with empty configuration object
+ globalConfiguration = new Configuration();
+ globalConfiguration.clear();
+
+ // Load configured hadoop configuration directory
+ String configDirectory = context.getString(prefix + Constants.CONF_CONFIG_DIR);
+
+ // Git list of files ending with "-site.xml" (configuration files)
+ File dir = new File(configDirectory);
+ String [] files = dir.list(new FilenameFilter() {
+ @Override
+ public boolean accept(File dir, String name) {
+ return name.endsWith("-site.xml");
+ }
+ });
+
+ // Add each such file to our global configuration object
+ for (String file : files) {
+ LOG.info("Found hadoop configuration file " + file);
+ try {
+ globalConfiguration.addResource(new File(configDirectory, file).toURI().toURL());
+ } catch (MalformedURLException e) {
+ LOG.error("Can't load configuration file: " + file, e);
+ }
+ }
+
+ // Create job client
+ try {
+ jobClient = new JobClient(new Configuration(globalConfiguration));
+ } catch (IOException e) {
+ throw new SqoopException(MapreduceSubmissionError.MAPREDUCE_0002, e);
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void destroy() {
+ LOG.info("Destroying Mapreduce Submission Engine");
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ @SuppressWarnings("unchecked")
+ public boolean submit(SubmissionRequest request) {
+ // Clone global configuration
+ Configuration configuration = new Configuration(globalConfiguration);
+
+ // Serialize framework context into job configuration
+ for(Map.Entry<String, String> entry: request.getFrameworkContext()) {
+ configuration.set(entry.getKey(), entry.getValue());
+ }
+
+ // Serialize connector context as a sub namespace
+ for(Map.Entry<String, String> entry :request.getConnectorContext()) {
+ configuration.set(
+ JobConstants.PREFIX_CONNECTOR_CONTEXT + entry.getKey(),
+ entry.getValue());
+ }
+
+ // Serialize configuration objects - Firstly configuration classes
+ configuration.set(JobConstants.JOB_CONFIG_CLASS_CONNECTOR_CONNECTION,
+ request.getConfigConnectorConnection().getClass().getName());
+ configuration.set(JobConstants.JOB_CONFIG_CLASS_CONNECTOR_JOB,
+ request.getConfigConnectorJob().getClass().getName());
+ configuration.set(JobConstants.JOB_CONFIG_CLASS_FRAMEWORK_CONNECTION,
+ request.getConfigFrameworkConnection().getClass().getName());
+ configuration.set(JobConstants.JOB_CONFIG_CLASS_FRAMEWORK_JOB,
+ request.getConfigFrameworkJob().getClass().getName());
+
+ // And finally configuration data
+ configuration.set(JobConstants.JOB_CONFIG_CONNECTOR_CONNECTION,
+ FormUtils.toJson(request.getConfigConnectorConnection()));
+ configuration.set(JobConstants.JOB_CONFIG_CONNECTOR_JOB,
+ FormUtils.toJson(request.getConfigConnectorJob()));
+ configuration.set(JobConstants.JOB_CONFIG_FRAMEWORK_CONNECTION,
+ FormUtils.toJson(request.getConfigFrameworkConnection()));
+ configuration.set(JobConstants.JOB_CONFIG_FRAMEWORK_JOB,
+ FormUtils.toJson(request.getConfigFrameworkConnection()));
+
+ // Promote all required jars to the job
+ StringBuilder sb = new StringBuilder();
+ boolean first = true;
+ for(String jar : request.getJars()) {
+ if(first) {
+ first = false;
+ } else {
+ sb.append(",");
+ }
+ LOG.debug("Adding jar to the job: " + jar);
+ sb.append(jar);
+ }
+ configuration.set("tmpjars", sb.toString());
+
+ try {
+ Job job = Job.getInstance(configuration);
+ job.setJobName(request.getJobName());
+
+ job.setInputFormatClass(request.getInputFormatClass());
+
+ job.setMapperClass(request.getMapperClass());
+ job.setMapOutputKeyClass(request.getMapOutputKeyClass());
+ job.setMapOutputValueClass(request.getMapOutputValueClass());
+
+ String outputDirectory = request.getOutputDirectory();
+ if(outputDirectory != null) {
+ FileOutputFormat.setOutputPath(job, new Path(outputDirectory));
+ }
+
+ // TODO(jarcec): Harcoded no reducers
+ job.setNumReduceTasks(0);
+
+ job.setOutputFormatClass(request.getOutputFormatClass());
+ job.setOutputKeyClass(request.getOutputKeyClass());
+ job.setOutputValueClass(request.getOutputValueClass());
+
+ job.submit();
+
+ String jobId = job.getJobID().toString();
+ request.getSummary().setExternalId(jobId);
+ request.getSummary().setExternalLink(job.getTrackingURL());
+
+ LOG.debug("Executed new map-reduce job with id " + jobId);
+ } catch (Exception e) {
+ LOG.error("Error in submitting job", e);
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void stop(String submissionId) {
+ try {
+ RunningJob runningJob = jobClient.getJob(JobID.forName(submissionId));
+ if(runningJob == null) {
+ return;
+ }
+
+ runningJob.killJob();
+ } catch (IOException e) {
+ throw new SqoopException(MapreduceSubmissionError.MAPREDUCE_0003, e);
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public SubmissionStatus status(String submissionId) {
+ try {
+ RunningJob runningJob = jobClient.getJob(JobID.forName(submissionId));
+ if(runningJob == null) {
+ return SubmissionStatus.UNKNOWN;
+ }
+
+ int status = runningJob.getJobState();
+ return convertMapreduceState(status);
+
+ } catch (IOException e) {
+ throw new SqoopException(MapreduceSubmissionError.MAPREDUCE_0003, e);
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public double progress(String submissionId) {
+ try {
+ // Get some reasonable approximation of map-reduce job progress
+ // TODO(jarcec): What if we're running without reducers?
+ RunningJob runningJob = jobClient.getJob(JobID.forName(submissionId));
+ if(runningJob == null) {
+ // Return default value
+ return super.progress(submissionId);
+ }
+
+ return (runningJob.mapProgress() + runningJob.reduceProgress()) / 2;
+ } catch (IOException e) {
+ throw new SqoopException(MapreduceSubmissionError.MAPREDUCE_0003, e);
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public Counters stats(String submissionId) {
+ //TODO(jarcec): Not supported yet
+ return super.stats(submissionId);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public String externalLink(String submissionId) {
+ try {
+ RunningJob runningJob = jobClient.getJob(JobID.forName(submissionId));
+ if(runningJob == null) {
+ return null;
+ }
+
+ return runningJob.getTrackingURL();
+ } catch (IOException e) {
+ throw new SqoopException(MapreduceSubmissionError.MAPREDUCE_0003, e);
+ }
+ }
+
+ /**
+ * Convert map-reduce specific job status constants to Sqoop job status
+ * constants.
+ *
+ * @param status Map-reduce job constant
+ * @return Equivalent submission status
+ */
+ protected SubmissionStatus convertMapreduceState(int status) {
+ if(status == JobStatus.PREP) {
+ return SubmissionStatus.BOOTING;
+ } else if (status == JobStatus.RUNNING) {
+ return SubmissionStatus.RUNNING;
+ } else if (status == JobStatus.FAILED) {
+ return SubmissionStatus.FAILED;
+ } else if (status == JobStatus.KILLED) {
+ return SubmissionStatus.FAILED;
+ } else if (status == JobStatus.SUCCEEDED) {
+ return SubmissionStatus.SUCCEEDED;
+ }
+
+ throw new SqoopException(MapreduceSubmissionError.MAPREDUCE_0004,
+ "Unknown status " + status);
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/1cd3c373/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionError.java
----------------------------------------------------------------------
diff --git a/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionError.java b/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionError.java
new file mode 100644
index 0000000..9296717
--- /dev/null
+++ b/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionError.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.submission.mapreduce;
+
+import org.apache.sqoop.common.ErrorCode;
+
+/**
+ *
+ */
+public enum MapreduceSubmissionError implements ErrorCode {
+
+ MAPREDUCE_0001("Unknown error"),
+
+ MAPREDUCE_0002("Failure on submission engine initialization"),
+
+ MAPREDUCE_0003("Can't get RunningJob instance"),
+
+ MAPREDUCE_0004("Unknown map reduce job status"),
+
+ ;
+
+ private final String message;
+
+ private MapreduceSubmissionError(String message) {
+ this.message = message;
+ }
+
+ public String getCode() {
+ return name();
+ }
+
+ public String getMessage() {
+ return message;
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/1cd3c373/submission/pom.xml
----------------------------------------------------------------------
diff --git a/submission/pom.xml b/submission/pom.xml
new file mode 100644
index 0000000..16550d9
--- /dev/null
+++ b/submission/pom.xml
@@ -0,0 +1,36 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+--><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache</groupId>
+ <artifactId>sqoop</artifactId>
+ <version>2.0.0-SNAPSHOT</version>
+ </parent>
+
+ <groupId>org.apache.sqoop</groupId>
+ <artifactId>submission</artifactId>
+ <name>Sqoop Submission Engines</name>
+ <packaging>pom</packaging>
+
+ <modules>
+ <module>mapreduce</module>
+ </modules>
+
+</project>