You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ab...@apache.org on 2014/10/10 04:51:44 UTC
[15/52] [abbrv] SQOOP-1497: Sqoop2: Entity Nomenclature Revisited
http://git-wip-us.apache.org/repos/asf/sqoop/blob/049994a0/core/src/main/java/org/apache/sqoop/driver/JobManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/driver/JobManager.java b/core/src/main/java/org/apache/sqoop/driver/JobManager.java
new file mode 100644
index 0000000..277c6be
--- /dev/null
+++ b/core/src/main/java/org/apache/sqoop/driver/JobManager.java
@@ -0,0 +1,712 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.driver;
+
+import java.util.Date;
+import java.util.List;
+
+import org.apache.log4j.Logger;
+import org.apache.sqoop.common.Direction;
+import org.apache.sqoop.common.MapContext;
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.connector.ConnectorManager;
+import org.apache.sqoop.connector.idf.IntermediateDataFormat;
+import org.apache.sqoop.connector.spi.SqoopConnector;
+import org.apache.sqoop.core.Reconfigurable;
+import org.apache.sqoop.core.SqoopConfiguration;
+import org.apache.sqoop.core.SqoopConfiguration.CoreConfigurationListener;
+import org.apache.sqoop.driver.configuration.JobConfiguration;
+import org.apache.sqoop.job.etl.Destroyer;
+import org.apache.sqoop.job.etl.DestroyerContext;
+import org.apache.sqoop.job.etl.Initializer;
+import org.apache.sqoop.job.etl.InitializerContext;
+import org.apache.sqoop.job.etl.Transferable;
+import org.apache.sqoop.model.FormUtils;
+import org.apache.sqoop.model.MJob;
+import org.apache.sqoop.model.MLink;
+import org.apache.sqoop.model.MSubmission;
+import org.apache.sqoop.repository.Repository;
+import org.apache.sqoop.repository.RepositoryManager;
+import org.apache.sqoop.request.HttpEventContext;
+import org.apache.sqoop.schema.Schema;
+import org.apache.sqoop.submission.SubmissionStatus;
+import org.apache.sqoop.submission.counter.Counters;
+import org.apache.sqoop.utils.ClassUtils;
+import org.json.simple.JSONValue;
+
+public class JobManager implements Reconfigurable {
+ /**
+ * Logger object.
+ */
+ private static final Logger LOG = Logger.getLogger(JobManager.class);
+
+ /**
+ * Private instance to singleton of this class.
+ */
+ private static JobManager instance;
+ /**
+ * Create default object by default.
+ *
+ * Every Sqoop server application needs one so this should not be performance
+ * issue.
+ */
+ static {
+ instance = new JobManager();
+ }
+
+ /**
+ * Return current instance.
+ *
+ * @return Current instance
+ */
+ public static JobManager getInstance() {
+ return instance;
+ }
+
+ /**
+ * Allows to set instance in case that it's need.
+ *
+ * This method should not be normally used as the default instance should be
+ * sufficient. One target user use case for this method are unit tests.
+ *
+ * @param newInstance
+ * New instance
+ */
+ public static void setInstance(JobManager newInstance) {
+ instance = newInstance;
+ }
+
+ /**
+ * Default interval for purging old submissions from repository.
+ */
+ private static final long DEFAULT_PURGE_THRESHOLD = 24 * 60 * 60 * 1000;
+
+ /**
+ * Default sleep interval for purge thread.
+ */
+ private static final long DEFAULT_PURGE_SLEEP = 24 * 60 * 60 * 1000;
+
+ /**
+ * Default interval for update thread.
+ */
+ private static final long DEFAULT_UPDATE_SLEEP = 60 * 5 * 1000;
+
+ /**
+ * Configured submission engine instance
+ */
+ private SubmissionEngine submissionEngine;
+
+ /**
+ * Configured execution engine instance
+ */
+ private ExecutionEngine executionEngine;
+
+ /**
+ * Purge thread that will periodically remove old submissions from repository.
+ */
+ private PurgeThread purgeThread = null;
+
+ /**
+ * Update thread that will periodically check status of running submissions.
+ */
+ private UpdateThread updateThread = null;
+
+ /**
+ * Synchronization variable between threads.
+ */
+ private boolean running = true;
+
+ /**
+ * Specifies how old submissions should be removed from repository.
+ */
+ private long purgeThreshold;
+
+ /**
+ * Number of milliseconds for purge thread to sleep.
+ */
+ private long purgeSleep;
+
+ /**
+ * Number of milliseconds for update thread to slepp.
+ */
+ private long updateSleep;
+
+ /**
+ * Base notification URL.
+ *
+ * Driver manager will always add job id.
+ */
+ private String notificationBaseUrl;
+
+ /**
+ * Set notification base URL.
+ *
+ * @param url
+ * Base URL
+ */
+ public void setNotificationBaseUrl(String url) {
+ LOG.debug("Setting notification base URL to " + url);
+ notificationBaseUrl = url;
+ }
+
+ /**
+ * Get base notification url.
+ *
+ * @return String representation of the URL
+ */
+ public String getNotificationBaseUrl() {
+ return notificationBaseUrl;
+ }
+
+ public synchronized void destroy() {
+ LOG.trace("Begin submission engine manager destroy");
+
+ running = false;
+
+ try {
+ purgeThread.interrupt();
+ purgeThread.join();
+ } catch (InterruptedException e) {
+ // TODO(jarcec): Do I want to wait until it actually finish here?
+ LOG.error("Interrupted joining purgeThread");
+ }
+
+ try {
+ updateThread.interrupt();
+ updateThread.join();
+ } catch (InterruptedException e) {
+ // TODO(jarcec): Do I want to wait until it actually finish here?
+ LOG.error("Interrupted joining updateThread");
+ }
+
+ if (submissionEngine != null) {
+ submissionEngine.destroy();
+ }
+
+ if (executionEngine != null) {
+ executionEngine.destroy();
+ }
+ }
+
+ public synchronized void initialize() {
+ LOG.trace("Begin submission engine manager initialization");
+ MapContext context = SqoopConfiguration.getInstance().getContext();
+
+ // Let's load configured submission engine
+ String submissionEngineClassName =
+ context.getString(DriverConstants.SYSCFG_SUBMISSION_ENGINE);
+
+ submissionEngine = (SubmissionEngine) ClassUtils
+ .instantiate(submissionEngineClassName);
+ if (submissionEngine == null) {
+ throw new SqoopException(DriverError.DRIVER_0001,
+ submissionEngineClassName);
+ }
+
+ submissionEngine.initialize(context,
+ DriverConstants.PREFIX_SUBMISSION_ENGINE_CONFIG);
+
+ // Execution engine
+ String executionEngineClassName =
+ context.getString(DriverConstants.SYSCFG_EXECUTION_ENGINE);
+
+ executionEngine = (ExecutionEngine) ClassUtils
+ .instantiate(executionEngineClassName);
+ if (executionEngine == null) {
+ throw new SqoopException(DriverError.DRIVER_0007,
+ executionEngineClassName);
+ }
+
+ // We need to make sure that user has configured compatible combination of
+ // submission engine and execution engine
+ if (!submissionEngine
+ .isExecutionEngineSupported(executionEngine.getClass())) {
+ throw new SqoopException(DriverError.DRIVER_0008);
+ }
+
+ executionEngine.initialize(context,
+ DriverConstants.PREFIX_EXECUTION_ENGINE_CONFIG);
+
+ // Set up worker threads
+ purgeThreshold = context.getLong(
+ DriverConstants.SYSCFG_SUBMISSION_PURGE_THRESHOLD,
+ DEFAULT_PURGE_THRESHOLD
+ );
+ purgeSleep = context.getLong(
+ DriverConstants.SYSCFG_SUBMISSION_PURGE_SLEEP,
+ DEFAULT_PURGE_SLEEP
+ );
+
+ purgeThread = new PurgeThread();
+ purgeThread.start();
+
+ updateSleep = context.getLong(
+ DriverConstants.SYSCFG_SUBMISSION_UPDATE_SLEEP,
+ DEFAULT_UPDATE_SLEEP
+ );
+
+ updateThread = new UpdateThread();
+ updateThread.start();
+
+ SqoopConfiguration.getInstance().getProvider()
+ .registerListener(new CoreConfigurationListener(this));
+
+ LOG.info("Submission manager initialized: OK");
+ }
+
+ public MSubmission submit(long jobId, HttpEventContext ctx) {
+
+ MSubmission mSubmission = createJobSubmission(ctx, jobId);
+ JobRequest jobRequest = createJobRequest(jobId, mSubmission);
+ // Bootstrap job to execute
+ prepareJob(jobRequest);
+ // Make sure that this job id is not currently running and submit the job
+ // only if it's not.
+ synchronized (getClass()) {
+ MSubmission lastSubmission = RepositoryManager.getInstance().getRepository()
+ .findSubmissionLastForJob(jobId);
+ if (lastSubmission != null && lastSubmission.getStatus().isRunning()) {
+ throw new SqoopException(DriverError.DRIVER_0002, "Job with id " + jobId);
+ }
+ // TODO(Abe): Call multiple destroyers.
+ // TODO(jarcec): We might need to catch all exceptions here to ensure
+ // that Destroyer will be executed in all cases.
+ // NOTE: the following is a blocking call
+ boolean success = submissionEngine.submit(jobRequest);
+ if (!success) {
+ destroySubmission(jobRequest);
+ mSubmission.setStatus(SubmissionStatus.FAILURE_ON_SUBMIT);
+ }
+ RepositoryManager.getInstance().getRepository().createSubmission(mSubmission);
+ }
+ return mSubmission;
+ }
+
+ private JobRequest createJobRequest(long jobId, MSubmission submission) {
+ // get job
+ MJob job = getJob(jobId);
+
+ // get from/to connections for the job
+ MLink fromConnection = getLink(job.getLinkId(Direction.FROM));
+ MLink toConnection = getLink(job.getLinkId(Direction.TO));
+
+ // get from/to connectors for the connection
+ SqoopConnector fromConnector = getConnector(fromConnection.getConnectorId());
+ validateSupportedDirection(fromConnector, Direction.FROM);
+ SqoopConnector toConnector = getConnector(toConnection.getConnectorId());
+ validateSupportedDirection(toConnector, Direction.TO);
+
+ // Transform config to fromConnector specific classes
+ Object fromConnectionConfig = ClassUtils.instantiate(fromConnector
+ .getLinkConfigurationClass());
+ FormUtils.fromForms(fromConnection.getConnectorPart().getForms(), fromConnectionConfig);
+
+ // Transform config to toConnector specific classes
+ Object toConnectorConfig = ClassUtils
+ .instantiate(toConnector.getLinkConfigurationClass());
+ FormUtils.fromForms(toConnection.getConnectorPart().getForms(), toConnectorConfig);
+
+ Object fromJob = ClassUtils.instantiate(fromConnector.getJobConfigurationClass(Direction.FROM));
+ FormUtils.fromForms(job.getConnectorPart(Direction.FROM).getForms(), fromJob);
+
+ Object toJob = ClassUtils.instantiate(toConnector.getJobConfigurationClass(Direction.TO));
+ FormUtils.fromForms(job.getConnectorPart(Direction.TO).getForms(), toJob);
+
+ // Transform framework specific configs
+ // Q(VB) : Aren't the following 2 exactly the same?
+ Object fromDriverConnection = ClassUtils.instantiate(Driver.getInstance()
+ .getLinkConfigurationClass());
+ FormUtils.fromForms(fromConnection.getFrameworkPart().getForms(), fromDriverConnection);
+
+ Object toDriverConnection = ClassUtils.instantiate(Driver.getInstance()
+ .getLinkConfigurationClass());
+ FormUtils.fromForms(toConnection.getFrameworkPart().getForms(), toDriverConnection);
+
+ Object frameworkJob = ClassUtils.instantiate(Driver.getInstance()
+ .getJobConfigurationClass());
+ FormUtils.fromForms(job.getFrameworkPart().getForms(), frameworkJob);
+
+ // Create a job request for submit/execution
+ JobRequest jobRequest = executionEngine.createJobRequest();
+ // Save important variables to the job request
+ jobRequest.setSummary(submission);
+ jobRequest.setConnector(Direction.FROM, fromConnector);
+ jobRequest.setConnector(Direction.TO, toConnector);
+ jobRequest.setConnectorLinkConfig(Direction.FROM, fromConnectionConfig);
+ jobRequest.setConnectorLinkConfig(Direction.TO, toConnectorConfig);
+ jobRequest.setConnectorJobConfig(Direction.FROM, fromJob);
+ jobRequest.setConnectorJobConfig(Direction.TO, toJob);
+ // TODO(Abe): Should we actually have 2 different Driver Connection config objects?
+ jobRequest.setFrameworkLinkConfig(Direction.FROM, fromDriverConnection);
+ jobRequest.setFrameworkLinkConfig(Direction.TO, toDriverConnection);
+ jobRequest.setFrameworkJobConfig(frameworkJob);
+ jobRequest.setJobName(job.getName());
+ jobRequest.setJobId(job.getPersistenceId());
+ jobRequest.setNotificationUrl(notificationBaseUrl + jobId);
+ Class<? extends IntermediateDataFormat<?>> dataFormatClass =
+ fromConnector.getIntermediateDataFormat();
+ jobRequest.setIntermediateDataFormat(fromConnector.getIntermediateDataFormat());
+
+
+ jobRequest.setFrom(fromConnector.getFrom());
+ jobRequest.setTo(toConnector.getTo());
+
+ addStandardJars(jobRequest);
+ addConnectorJars(jobRequest, fromConnector, toConnector, dataFormatClass);
+ addConnectorInitializerJars(jobRequest, Direction.FROM);
+ addConnectorInitializerJars(jobRequest, Direction.TO);
+
+ Schema fromSchema = getSchemaFromConnector(jobRequest, Direction.FROM);
+ Schema toSchema = getSchemaFromConnector(jobRequest, Direction.TO);
+
+ // TODO(Gwen): Need better logic here once the Schema refactor: SQOOP-1378
+ if (fromSchema != null) {
+ jobRequest.getSummary().setFromSchema(fromSchema);
+ }
+ else {
+ jobRequest.getSummary().setFromSchema(toSchema);
+ }
+ LOG.debug("Using entities: " + jobRequest.getFrom() + ", " + jobRequest.getTo());
+ return jobRequest;
+ }
+
+ private void addConnectorJars(JobRequest jobRequest, SqoopConnector fromConnector,
+ SqoopConnector toConnector, Class<? extends IntermediateDataFormat<?>> dataFormatClass) {
+ jobRequest.addJarForClass(fromConnector.getClass());
+ jobRequest.addJarForClass(toConnector.getClass());
+ jobRequest.addJarForClass(dataFormatClass);
+ }
+
+ private void addStandardJars(JobRequest jobRequest) {
+ // Let's register all important jars
+ // sqoop-common
+ jobRequest.addJarForClass(MapContext.class);
+ // sqoop-core
+ jobRequest.addJarForClass(Driver.class);
+ // sqoop-spi
+ jobRequest.addJarForClass(SqoopConnector.class);
+ // Execution engine jar
+ jobRequest.addJarForClass(executionEngine.getClass());
+ // Extra libraries that Sqoop code requires
+ jobRequest.addJarForClass(JSONValue.class);
+ }
+
+ MSubmission createJobSubmission(HttpEventContext ctx, long jobId) {
+ MSubmission summary = new MSubmission(jobId);
+ summary.setCreationUser(ctx.getUsername());
+ summary.setLastUpdateUser(ctx.getUsername());
+ return summary;
+ }
+
+ SqoopConnector getConnector(long connnectorId) {
+ return ConnectorManager.getInstance().getConnector(connnectorId);
+ }
+
+ void validateSupportedDirection(SqoopConnector connector, Direction direction) {
+ // Make sure that connector supports the given direction
+ if (!connector.getSupportedDirections().contains(direction)) {
+ throw new SqoopException(DriverError.DRIVER_0011, "Connector: "
+ + connector.getClass().getCanonicalName());
+ }
+ }
+
+ MLink getLink(long linkId) {
+ MLink link = RepositoryManager.getInstance().getRepository()
+ .findLink(linkId);
+ if (!link.getEnabled()) {
+ throw new SqoopException(DriverError.DRIVER_0010, "Connection id: "
+ + link.getPersistenceId());
+ }
+ return link;
+ }
+
+ MJob getJob(long jobId) {
+ MJob job = RepositoryManager.getInstance().getRepository().findJob(jobId);
+ if (job == null) {
+ throw new SqoopException(DriverError.DRIVER_0004, "Unknown job id: " + jobId);
+ }
+
+ if (!job.getEnabled()) {
+ throw new SqoopException(DriverError.DRIVER_0009, "Job id: " + job.getPersistenceId());
+ }
+ return job;
+ }
+
+ private Schema getSchemaFromConnector(JobRequest jobRequest, Direction direction) {
+
+ Initializer initializer = getConnectorInitializer(jobRequest, direction);
+
+ // Initializer context
+ InitializerContext initializerContext = getConnectorInitializerContext(jobRequest, direction);
+
+ // Initialize submission from the connector perspective
+ initializer.initialize(initializerContext, jobRequest.getConnectorLinkConfig(direction),
+ jobRequest.getConnectorJobConfig(direction));
+
+ // TODO(Abe): Alter behavior of Schema here.
+ return initializer.getSchema(initializerContext,
+ jobRequest.getConnectorLinkConfig(direction),
+ jobRequest.getConnectorJobConfig(direction));
+ }
+
+ private void addConnectorInitializerJars(JobRequest jobRequest, Direction direction) {
+
+ Initializer initializer = getConnectorInitializer(jobRequest, direction);
+ InitializerContext initializerContext = getConnectorInitializerContext(jobRequest, direction);
+ // Add job specific jars to
+ jobRequest.addJars(initializer.getJars(initializerContext,
+ jobRequest.getConnectorLinkConfig(direction),
+ jobRequest.getConnectorJobConfig(direction)));
+ }
+
+ private Initializer getConnectorInitializer(JobRequest jobRequest, Direction direction) {
+ Transferable transferable = direction.equals(Direction.FROM) ? jobRequest.getFrom() : jobRequest.getTo();
+ Class<? extends Initializer> initializerClass = transferable.getInitializer();
+ Initializer initializer = (Initializer) ClassUtils.instantiate(initializerClass);
+
+ if (initializer == null) {
+ throw new SqoopException(DriverError.DRIVER_0006,
+ "Can't create connector initializer instance: " + initializerClass.getName());
+ }
+ return initializer;
+ }
+
+ private InitializerContext getConnectorInitializerContext(JobRequest jobRequest, Direction direction) {
+ return new InitializerContext(jobRequest.getConnectorContext(direction));
+ }
+
+ void prepareJob(JobRequest request) {
+ JobConfiguration jobConfiguration = (JobConfiguration) request.getFrameworkJobConfig();
+ // We're directly moving configured number of extractors and loaders to
+ // underlying request object. In the future we might need to throttle this
+ // count based on other running jobs to meet our SLAs.
+ request.setExtractors(jobConfiguration.throttling.extractors);
+ request.setLoaders(jobConfiguration.throttling.loaders);
+
+ // Delegate rest of the job to execution engine
+ executionEngine.prepareJob(request);
+ }
+
+ /**
+ * Callback that will be called only if we failed to submit the job to the
+ * remote cluster.
+ */
+ void destroySubmission(JobRequest request) {
+ Transferable from = request.getFrom();
+ Transferable to = request.getTo();
+
+ Class<? extends Destroyer> fromDestroyerClass = from.getDestroyer();
+ Class<? extends Destroyer> toDestroyerClass = to.getDestroyer();
+ Destroyer fromDestroyer = (Destroyer) ClassUtils.instantiate(fromDestroyerClass);
+ Destroyer toDestroyer = (Destroyer) ClassUtils.instantiate(toDestroyerClass);
+
+ if (fromDestroyer == null) {
+ throw new SqoopException(DriverError.DRIVER_0006,
+ "Can't create toDestroyer instance: " + fromDestroyerClass.getName());
+ }
+
+ if (toDestroyer == null) {
+ throw new SqoopException(DriverError.DRIVER_0006,
+ "Can't create toDestroyer instance: " + toDestroyerClass.getName());
+ }
+
+ // TODO(Abe): Update context to manage multiple connectors. As well as summary.
+ DestroyerContext fromDestroyerContext = new DestroyerContext(
+ request.getConnectorContext(Direction.FROM), false, request.getSummary()
+ .getFromSchema());
+ DestroyerContext toDestroyerContext = new DestroyerContext(
+ request.getConnectorContext(Direction.TO), false, request.getSummary()
+ .getToSchema());
+
+ // destroy submission from connector perspective
+ fromDestroyer.destroy(fromDestroyerContext, request.getConnectorLinkConfig(Direction.FROM),
+ request.getConnectorJobConfig(Direction.FROM));
+ toDestroyer.destroy(toDestroyerContext, request.getConnectorLinkConfig(Direction.TO),
+ request.getConnectorJobConfig(Direction.TO));
+ }
+
+ public MSubmission stop(long jobId, HttpEventContext ctx) {
+
+ Repository repository = RepositoryManager.getInstance().getRepository();
+ MSubmission mSubmission = repository.findSubmissionLastForJob(jobId);
+
+ if (mSubmission == null || !mSubmission.getStatus().isRunning()) {
+ throw new SqoopException(DriverError.DRIVER_0003, "Job with id " + jobId
+ + " is not running");
+ }
+ submissionEngine.stop(mSubmission.getExternalId());
+
+ mSubmission.setLastUpdateUser(ctx.getUsername());
+
+ // Fetch new information to verify that the stop command has actually worked
+ update(mSubmission);
+
+ // Return updated structure
+ return mSubmission;
+ }
+
+ public MSubmission status(long jobId) {
+ Repository repository = RepositoryManager.getInstance().getRepository();
+ MSubmission mSubmission = repository.findSubmissionLastForJob(jobId);
+
+ if (mSubmission == null) {
+ return new MSubmission(jobId, new Date(), SubmissionStatus.NEVER_EXECUTED);
+ }
+
+ // If the submission is in running state, let's update it
+ if (mSubmission.getStatus().isRunning()) {
+ update(mSubmission);
+ }
+
+ return mSubmission;
+ }
+
+ private void update(MSubmission submission) {
+ double progress = -1;
+ Counters counters = null;
+ String externalId = submission.getExternalId();
+ SubmissionStatus newStatus = submissionEngine.status(externalId);
+ String externalLink = submissionEngine.externalLink(externalId);
+
+ if (newStatus.isRunning()) {
+ progress = submissionEngine.progress(externalId);
+ } else {
+ counters = submissionEngine.counters(externalId);
+ }
+
+ submission.setStatus(newStatus);
+ submission.setProgress(progress);
+ submission.setCounters(counters);
+ submission.setExternalLink(externalLink);
+ submission.setLastUpdateDate(new Date());
+
+ RepositoryManager.getInstance().getRepository()
+ .updateSubmission(submission);
+ }
+
+ @Override
+ public synchronized void configurationChanged() {
+ LOG.info("Begin submission engine manager reconfiguring");
+ MapContext newContext = SqoopConfiguration.getInstance().getContext();
+ MapContext oldContext = SqoopConfiguration.getInstance().getOldContext();
+
+ String newSubmissionEngineClassName = newContext
+ .getString(DriverConstants.SYSCFG_SUBMISSION_ENGINE);
+ if (newSubmissionEngineClassName == null
+ || newSubmissionEngineClassName.trim().length() == 0) {
+ throw new SqoopException(DriverError.DRIVER_0001,
+ newSubmissionEngineClassName);
+ }
+
+ String oldSubmissionEngineClassName = oldContext
+ .getString(DriverConstants.SYSCFG_SUBMISSION_ENGINE);
+ if (!newSubmissionEngineClassName.equals(oldSubmissionEngineClassName)) {
+ LOG.warn("Submission engine cannot be replaced at the runtime. " +
+ "You might need to restart the server.");
+ }
+
+ String newExecutionEngineClassName = newContext
+ .getString(DriverConstants.SYSCFG_EXECUTION_ENGINE);
+ if (newExecutionEngineClassName == null
+ || newExecutionEngineClassName.trim().length() == 0) {
+ throw new SqoopException(DriverError.DRIVER_0007,
+ newExecutionEngineClassName);
+ }
+
+ String oldExecutionEngineClassName = oldContext
+ .getString(DriverConstants.SYSCFG_EXECUTION_ENGINE);
+ if (!newExecutionEngineClassName.equals(oldExecutionEngineClassName)) {
+ LOG.warn("Execution engine cannot be replaced at the runtime. " +
+ "You might need to restart the server.");
+ }
+
+ // Set up worker threads
+ purgeThreshold = newContext.getLong(
+ DriverConstants.SYSCFG_SUBMISSION_PURGE_THRESHOLD,
+ DEFAULT_PURGE_THRESHOLD
+ );
+ purgeSleep = newContext.getLong(
+ DriverConstants.SYSCFG_SUBMISSION_PURGE_SLEEP,
+ DEFAULT_PURGE_SLEEP
+ );
+ purgeThread.interrupt();
+
+ updateSleep = newContext.getLong(
+ DriverConstants.SYSCFG_SUBMISSION_UPDATE_SLEEP,
+ DEFAULT_UPDATE_SLEEP
+ );
+ updateThread.interrupt();
+
+ LOG.info("Submission engine manager reconfigured.");
+ }
+
+ private class PurgeThread extends Thread {
+ public PurgeThread() {
+ super("PurgeThread");
+ }
+
+ public void run() {
+ LOG.info("Starting submission manager purge thread");
+
+ while (running) {
+ try {
+ LOG.info("Purging old submissions");
+ Date threshold = new Date((new Date()).getTime() - purgeThreshold);
+ RepositoryManager.getInstance().getRepository()
+ .purgeSubmissions(threshold);
+ Thread.sleep(purgeSleep);
+ } catch (InterruptedException e) {
+ LOG.debug("Purge thread interrupted", e);
+ }
+ }
+
+ LOG.info("Ending submission manager purge thread");
+ }
+ }
+
+ private class UpdateThread extends Thread {
+ public UpdateThread() {
+ super("UpdateThread");
+ }
+
+ public void run() {
+ LOG.info("Starting submission manager update thread");
+
+ while (running) {
+ try {
+ LOG.debug("Updating running submissions");
+
+ // Let's get all running submissions from repository to check them out
+ List<MSubmission> unfinishedSubmissions =
+ RepositoryManager.getInstance().getRepository()
+ .findSubmissionsUnfinished();
+
+ for (MSubmission submission : unfinishedSubmissions) {
+ update(submission);
+ }
+
+ Thread.sleep(updateSleep);
+ } catch (InterruptedException e) {
+ LOG.debug("Purge thread interrupted", e);
+ }
+ }
+
+ LOG.info("Ending submission manager update thread");
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/sqoop/blob/049994a0/core/src/main/java/org/apache/sqoop/driver/JobRequest.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/driver/JobRequest.java b/core/src/main/java/org/apache/sqoop/driver/JobRequest.java
new file mode 100644
index 0000000..63e1e49
--- /dev/null
+++ b/core/src/main/java/org/apache/sqoop/driver/JobRequest.java
@@ -0,0 +1,356 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.driver;
+
+import org.apache.sqoop.common.Direction;
+import org.apache.sqoop.common.DirectionError;
+import org.apache.sqoop.common.MutableMapContext;
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.connector.idf.IntermediateDataFormat;
+import org.apache.sqoop.connector.spi.SqoopConnector;
+import org.apache.sqoop.job.etl.Transferable;
+import org.apache.sqoop.model.MSubmission;
+import org.apache.sqoop.utils.ClassUtils;
+
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * Submission details class is used when creating new submission and contains
+ * all information that we need to create a new submission (including mappers,
+ * reducers, ...).
+ */
+public class JobRequest {
+
+ /**
+ * Submission summary
+ */
+ MSubmission summary;
+
+ /**
+ * Original job name
+ */
+ String jobName;
+
+ /**
+ * Associated job (from metadata perspective) id
+ */
+ long jobId;
+
+ /**
+ * Connector instances associated with this submission request
+ */
+ SqoopConnector fromConnector;
+ SqoopConnector toConnector;
+
+ /**
+ * List of required local jars for the job
+ */
+ List<String> jars;
+
+ /**
+ * From entity
+ */
+ Transferable from;
+
+ /**
+ * To entity
+ */
+ Transferable to;
+
+ /**
+ * All configuration objects
+ */
+ Object fromConnectorLinkConfig;
+ Object toConnectorLinkConfig;
+ Object fromConnectorJobConfig;
+ Object toConnectorJobConfig;
+ Object fromFrameworkLinkConfig;
+ Object toFrameworkLinkConfig;
+ Object frameworkJobConfig;
+
+ /**
+ * Connector context (submission specific configuration)
+ */
+ MutableMapContext fromConnectorContext;
+ MutableMapContext toConnectorContext;
+
+ /**
+ * Framework context (submission specific configuration)
+ */
+ MutableMapContext driverContext;
+
+ /**
+ * Optional notification URL for job progress
+ */
+ String notificationUrl;
+
+ /**
+ * Number of extractors
+ */
+ Integer extractors;
+
+ /**
+ * Number of loaders
+ */
+ Integer loaders;
+
+ /**
+ * The intermediate data format this submission should use.
+ */
+ Class<? extends IntermediateDataFormat> intermediateDataFormat;
+
+ public JobRequest() {
+ this.jars = new LinkedList<String>();
+ this.fromConnectorContext = new MutableMapContext();
+ this.toConnectorContext = new MutableMapContext();
+ this.driverContext = new MutableMapContext();
+ this.fromConnector = null;
+ this.toConnector = null;
+ this.fromConnectorLinkConfig = null;
+ this.toConnectorLinkConfig = null;
+ this.fromConnectorJobConfig = null;
+ this.toConnectorJobConfig = null;
+ this.fromFrameworkLinkConfig = null;
+ this.toFrameworkLinkConfig = null;
+ }
+
+ public MSubmission getSummary() {
+ return summary;
+ }
+
+ public void setSummary(MSubmission summary) {
+ this.summary = summary;
+ }
+
+ public String getJobName() {
+ return jobName;
+ }
+
+ public void setJobName(String jobName) {
+ this.jobName = jobName;
+ }
+
+ public long getJobId() {
+ return jobId;
+ }
+
+ public void setJobId(long jobId) {
+ this.jobId = jobId;
+ }
+
+ public SqoopConnector getConnector(Direction type) {
+ switch(type) {
+ case FROM:
+ return fromConnector;
+
+ case TO:
+ return toConnector;
+
+ default:
+ throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type);
+ }
+ }
+
+ public void setConnector(Direction type, SqoopConnector connector) {
+ switch(type) {
+ case FROM:
+ fromConnector = connector;
+ break;
+
+ case TO:
+ toConnector = connector;
+ break;
+
+ default:
+ throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type);
+ }
+ }
+
+ public List<String> getJars() {
+ return jars;
+ }
+
+ public void addJar(String jar) {
+ if(!jars.contains(jar)) {
+ jars.add(jar);
+ }
+ }
+
+ public void addJarForClass(Class klass) {
+ addJar(ClassUtils.jarForClass(klass));
+ }
+
+ public void addJars(List<String> jars) {
+ for(String j : jars) {
+ addJar(j);
+ }
+ }
+
+ public Transferable getFrom() {
+ return from;
+ }
+
+ public void setFrom(Transferable from) {
+ this.from = from;
+ }
+
+ public Transferable getTo() {
+ return to;
+ }
+
+ public void setTo(Transferable to) {
+ this.to = to;
+ }
+
+ public Object getConnectorLinkConfig(Direction type) {
+ switch(type) {
+ case FROM:
+ return fromConnectorLinkConfig;
+
+ case TO:
+ return toConnectorLinkConfig;
+
+ default:
+ throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type);
+ }
+ }
+
+ public void setConnectorLinkConfig(Direction type, Object config) {
+ switch(type) {
+ case FROM:
+ fromConnectorLinkConfig = config;
+ break;
+ case TO:
+ toConnectorLinkConfig = config;
+ break;
+ default:
+ throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type);
+ }
+ }
+
+ public Object getConnectorJobConfig(Direction type) {
+ switch(type) {
+ case FROM:
+ return fromConnectorJobConfig;
+
+ case TO:
+ return toConnectorJobConfig;
+
+ default:
+ throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type);
+ }
+ }
+
+ public void setConnectorJobConfig(Direction type, Object config) {
+ switch(type) {
+ case FROM:
+ fromConnectorJobConfig = config;
+ break;
+ case TO:
+ toConnectorJobConfig = config;
+ break;
+ default:
+ throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type);
+ }
+ }
+
+ public Object getFrameworkLinkConfig(Direction type) {
+ switch(type) {
+ case FROM:
+ return fromFrameworkLinkConfig;
+
+ case TO:
+ return toFrameworkLinkConfig;
+
+ default:
+ throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type);
+ }
+ }
+
+ public void setFrameworkLinkConfig(Direction type, Object config) {
+ switch(type) {
+ case FROM:
+ fromFrameworkLinkConfig = config;
+ break;
+ case TO:
+ toFrameworkLinkConfig = config;
+ break;
+ default:
+ throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type);
+ }
+ }
+
+ public Object getFrameworkJobConfig() {
+ return frameworkJobConfig;
+ }
+
+ public void setFrameworkJobConfig(Object config) {
+ frameworkJobConfig = config;
+ }
+
+ public MutableMapContext getConnectorContext(Direction type) {
+ switch(type) {
+ case FROM:
+ return fromConnectorContext;
+
+ case TO:
+ return toConnectorContext;
+
+ default:
+ throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type);
+ }
+ }
+
+ public MutableMapContext getDriverContext() {
+ return driverContext;
+ }
+
+ public String getNotificationUrl() {
+ return notificationUrl;
+ }
+
+ public void setNotificationUrl(String url) {
+ this.notificationUrl = url;
+ }
+
+ public Integer getExtractors() {
+ return extractors;
+ }
+
+ public void setExtractors(Integer extractors) {
+ this.extractors = extractors;
+ }
+
+ public Integer getLoaders() {
+ return loaders;
+ }
+
+ public void setLoaders(Integer loaders) {
+ this.loaders = loaders;
+ }
+
+ public Class<? extends IntermediateDataFormat> getIntermediateDataFormat() {
+ return intermediateDataFormat;
+ }
+
+ public void setIntermediateDataFormat(Class<? extends IntermediateDataFormat> intermediateDataFormat) {
+ this.intermediateDataFormat = intermediateDataFormat;
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/sqoop/blob/049994a0/core/src/main/java/org/apache/sqoop/driver/SubmissionEngine.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/driver/SubmissionEngine.java b/core/src/main/java/org/apache/sqoop/driver/SubmissionEngine.java
new file mode 100644
index 0000000..3a32e9f
--- /dev/null
+++ b/core/src/main/java/org/apache/sqoop/driver/SubmissionEngine.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.driver;
+
+import org.apache.sqoop.common.MapContext;
+import org.apache.sqoop.submission.counter.Counters;
+import org.apache.sqoop.submission.SubmissionStatus;
+
+/**
+ * Submission engine is responsible in conveying the information about the
+ * job instances (submissions) to remote (hadoop) cluster.
+ */
+public abstract class SubmissionEngine {
+
+ /**
+ * Initialize submission engine
+ *
+ * @param context Configuration context
+ * @param prefix Submission engine prefix
+ */
+ public void initialize(MapContext context, String prefix) {
+ }
+
+ /**
+ * Destroy submission engine when stopping server
+ */
+ public void destroy() {
+ }
+
+ /**
+ * Callback to verify that configured submission engine and execution engine
+ * are compatible.
+ *
+ * @param executionEngineClass Configured execution class.
+ * @return True if such execution engine is supported
+ */
+ public abstract boolean isExecutionEngineSupported(Class<?> executionEngineClass);
+
+ /**
+ * Submit new job to remote (hadoop) cluster. This method *must* fill
+ * submission.getSummary.setExternalId(), otherwise Sqoop won't
+ * be able to track progress on this job!
+ *
+ * @return Return true if we were able to submit job to remote cluster.
+ */
+ public abstract boolean submit(JobRequest submission);
+
+ /**
+ * Hard stop for given submission.
+ *
+ * @param submissionId Submission internal id.
+ */
+ public abstract void stop(String submissionId);
+
+ /**
+ * Return status of given submission.
+ *
+ * @param submissionId Submission internal id.
+ * @return Current submission status.
+ */
+ public abstract SubmissionStatus status(String submissionId);
+
+ /**
+ * Return submission progress.
+ *
+ * Expected is number from interval <0, 1> denoting how far the processing
+ * has gone or -1 in case that this submission engine do not supports
+ * progress reporting.
+ *
+ * @param submissionId Submission internal id.
+ * @return {-1} union <0, 1>
+ */
+ public double progress(String submissionId) {
+ return -1;
+ }
+
+ /**
+ * Return statistics for given submission id.
+ *
+ * Sqoop will call counters only for submission in state SUCCEEDED,
+ * it's consider exceptional state to call this method for other states.
+ *
+ * @param submissionId Submission internal id.
+ * @return Submission statistics
+ */
+ public Counters counters(String submissionId) {
+ return null;
+ }
+
+ /**
+ * Return link to external web page with given submission.
+ *
+ * @param submissionId Submission internal id.
+ * @return Null in case that external page is not supported or available or
+ * HTTP link to given submission.
+ */
+ public String externalLink(String submissionId) {
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/049994a0/core/src/main/java/org/apache/sqoop/driver/configuration/JobConfiguration.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/driver/configuration/JobConfiguration.java b/core/src/main/java/org/apache/sqoop/driver/configuration/JobConfiguration.java
new file mode 100644
index 0000000..908a4eb
--- /dev/null
+++ b/core/src/main/java/org/apache/sqoop/driver/configuration/JobConfiguration.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.driver.configuration;
+
+import org.apache.sqoop.model.ConfigurationClass;
+import org.apache.sqoop.model.Form;
+
+/**
+ * Representing the core job configuration
+ */
+@ConfigurationClass
+public class JobConfiguration {
+ @Form
+ public ThrottlingForm throttling;
+
+ public JobConfiguration() {
+ throttling = new ThrottlingForm();
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/049994a0/core/src/main/java/org/apache/sqoop/driver/configuration/LinkConfiguration.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/driver/configuration/LinkConfiguration.java b/core/src/main/java/org/apache/sqoop/driver/configuration/LinkConfiguration.java
new file mode 100644
index 0000000..3202844
--- /dev/null
+++ b/core/src/main/java/org/apache/sqoop/driver/configuration/LinkConfiguration.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.driver.configuration;
+
+import org.apache.sqoop.model.ConfigurationClass;
+
+/**
+ * Representing the core link configuration
+ */
+@ConfigurationClass
+public class LinkConfiguration {
+
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/049994a0/core/src/main/java/org/apache/sqoop/driver/configuration/ThrottlingForm.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/driver/configuration/ThrottlingForm.java b/core/src/main/java/org/apache/sqoop/driver/configuration/ThrottlingForm.java
new file mode 100644
index 0000000..e73007e
--- /dev/null
+++ b/core/src/main/java/org/apache/sqoop/driver/configuration/ThrottlingForm.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.driver.configuration;
+
+import org.apache.sqoop.model.FormClass;
+import org.apache.sqoop.model.Input;
+
+/**
+ * Form to set up number of loaders and extractors
+ */
+@FormClass
+public class ThrottlingForm {
+
+ @Input public Integer extractors;
+
+ @Input public Integer loaders;
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/049994a0/core/src/main/java/org/apache/sqoop/framework/ExecutionEngine.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/ExecutionEngine.java b/core/src/main/java/org/apache/sqoop/framework/ExecutionEngine.java
deleted file mode 100644
index 75b570d..0000000
--- a/core/src/main/java/org/apache/sqoop/framework/ExecutionEngine.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.framework;
-
-import org.apache.sqoop.common.ImmutableContext;
-
-/**
- * Execution engine drives execution of sqoop job. It's responsible
- * for executing all defined steps in the import/export workflow.
- * A successful job execution will be recorded in the job submission entity
- */
-public abstract class ExecutionEngine {
-
- /**
- * Initialize execution engine
- *
- * @param context Configuration context
- * @parma prefix Execution engine prefix
- */
- public void initialize(ImmutableContext context, String prefix) {
- }
-
- /**
- * Destroy execution engine when stopping server
- */
- public void destroy() {
- }
-
- /**
- * Return new JobRequest class or any subclass if it's needed by
- * execution and submission engine combination.
- *
- * @return new JobRequestobject
- */
- public JobRequest createJobRequest() {
- return new JobRequest();
- }
-
- /**
- * Prepare given job request.
- *
- * @param request JobRequest
- */
- public abstract void prepareJob(JobRequest request);
-}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/049994a0/core/src/main/java/org/apache/sqoop/framework/FrameworkConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/FrameworkConstants.java b/core/src/main/java/org/apache/sqoop/framework/FrameworkConstants.java
deleted file mode 100644
index 4293dce..0000000
--- a/core/src/main/java/org/apache/sqoop/framework/FrameworkConstants.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.framework;
-
-import org.apache.sqoop.core.ConfigurationConstants;
-
-/**
- * Constants that are used in framework module.
- */
-public final class FrameworkConstants {
-
- // Sqoop configuration constants
-
- public static final String PREFIX_SUBMISSION_CONFIG =
- ConfigurationConstants.PREFIX_GLOBAL_CONFIG + "submission.";
-
- public static final String PREFIX_EXECUTION_CONFIG =
- ConfigurationConstants.PREFIX_GLOBAL_CONFIG + "execution.";
-
- public static final String SYSCFG_SUBMISSION_ENGINE =
- PREFIX_SUBMISSION_CONFIG + "engine";
-
- public static final String PREFIX_SUBMISSION_ENGINE_CONFIG =
- SYSCFG_SUBMISSION_ENGINE + ".";
-
- public static final String PREFIX_SUBMISSION_PURGE_CONFIG =
- PREFIX_SUBMISSION_CONFIG + "purge.";
-
- public static final String SYSCFG_SUBMISSION_PURGE_THRESHOLD =
- PREFIX_SUBMISSION_PURGE_CONFIG + "threshold";
-
- public static final String SYSCFG_SUBMISSION_PURGE_SLEEP =
- PREFIX_SUBMISSION_PURGE_CONFIG + "sleep";
-
- public static final String PREFIX_SUBMISSION_UPDATE_CONFIG =
- PREFIX_SUBMISSION_CONFIG + "update.";
-
- public static final String SYSCFG_SUBMISSION_UPDATE_SLEEP =
- PREFIX_SUBMISSION_UPDATE_CONFIG + "sleep";
-
- public static final String SYSCFG_EXECUTION_ENGINE =
- PREFIX_EXECUTION_CONFIG + "engine";
-
- public static final String PREFIX_EXECUTION_ENGINE_CONFIG =
- SYSCFG_EXECUTION_ENGINE + ".";
-
- // Bundle names
-
- public static final String RESOURCE_BUNDLE_NAME = "framework-resources";
-
- private FrameworkConstants() {
- // Instantiation of this class is prohibited
- }
-}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/049994a0/core/src/main/java/org/apache/sqoop/framework/FrameworkError.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/FrameworkError.java b/core/src/main/java/org/apache/sqoop/framework/FrameworkError.java
deleted file mode 100644
index 8ecb197..0000000
--- a/core/src/main/java/org/apache/sqoop/framework/FrameworkError.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.framework;
-
-import org.apache.sqoop.common.ErrorCode;
-
-/**
- *
- */
-public enum FrameworkError implements ErrorCode {
-
- FRAMEWORK_0000("Metadata are not registered in repository"),
-
- FRAMEWORK_0001("Invalid submission engine"),
-
- FRAMEWORK_0002("Given job is already running"),
-
- FRAMEWORK_0003("Given job is not running"),
-
- FRAMEWORK_0004("Unknown job id"),
-
- FRAMEWORK_0005("Unsupported job type"),
-
- FRAMEWORK_0006("Can't bootstrap job"),
-
- FRAMEWORK_0007("Invalid execution engine"),
-
- FRAMEWORK_0008("Invalid combination of submission and execution engines"),
-
- FRAMEWORK_0009("Job has been disabled. Cannot submit this job."),
-
- FRAMEWORK_0010("Connection for this job has been disabled. Cannot submit this job."),
-
- FRAMEWORK_0011("Connector does not support direction. Cannot submit this job."),
-
- ;
-
- private final String message;
-
- private FrameworkError(String message) {
- this.message = message;
- }
-
- public String getCode() {
- return name();
- }
-
- public String getMessage() {
- return message;
- }
-}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/049994a0/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java b/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java
deleted file mode 100644
index 81e1147..0000000
--- a/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java
+++ /dev/null
@@ -1,178 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.framework;
-
-import org.apache.log4j.Logger;
-import org.apache.sqoop.connector.spi.MetadataUpgrader;
-import org.apache.sqoop.core.ConfigurationConstants;
-import org.apache.sqoop.core.Reconfigurable;
-import org.apache.sqoop.core.SqoopConfiguration;
-import org.apache.sqoop.core.SqoopConfiguration.CoreConfigurationListener;
-import org.apache.sqoop.framework.configuration.ConnectionConfiguration;
-import org.apache.sqoop.framework.configuration.JobConfiguration;
-import org.apache.sqoop.model.*;
-import org.apache.sqoop.repository.RepositoryManager;
-import org.apache.sqoop.validation.Validator;
-
-import java.util.Locale;
-import java.util.ResourceBundle;
-
-/**
- * Manager for Sqoop framework itself.
- *
- * All Sqoop internals are handled in this class:
- * * Submission engine
- * * Execution engine
- * * Framework metadata
- *
- * Current implementation of entire submission engine is using repository
- * for keeping track of running submissions. Thus, server might be restarted at
- * any time without any affect on running jobs. This approach however might not
- * be the fastest way and we might want to introduce internal structures for
- * running jobs in case that this approach will be too slow.
- */
-public class FrameworkManager implements Reconfigurable {
-
- /**
- * Logger object.
- */
- private static final Logger LOG = Logger.getLogger(FrameworkManager.class);
-
- /**
- * Private instance to singleton of this class.
- */
- private static FrameworkManager instance;
-
- /**
- * Create default object by default.
- *
- * Every Sqoop server application needs one so this should not be performance issue.
- */
- static {
- instance = new FrameworkManager();
- }
-
- /**
- * Return current instance.
- *
- * @return Current instance
- */
- public static FrameworkManager getInstance() {
- return instance;
- }
-
- /**
- * Allows to set instance in case that it's need.
- *
- * This method should not be normally used as the default instance should be sufficient. One target
- * user use case for this method are unit tests.
- *
- * @param newInstance New instance
- */
- public static void setInstance(FrameworkManager newInstance) {
- instance = newInstance;
- }
-
- /**
- * Framework metadata structures in MForm format
- */
- private MFramework mFramework;
-
- /**
- * Validator instance
- */
- private final Validator validator;
-
- /**
- * Upgrader instance
- */
- private final MetadataUpgrader upgrader;
-
- /**
- * Default framework auto upgrade option value
- */
- private static final boolean DEFAULT_AUTO_UPGRADE = false;
-
- public static final String CURRENT_FRAMEWORK_VERSION = "1";
-
- public Class getJobConfigurationClass() {
- return JobConfiguration.class;
- }
-
- public Class getConnectionConfigurationClass() {
- return ConnectionConfiguration.class;
- }
-
- public FrameworkManager() {
- MConnectionForms connectionForms = new MConnectionForms(
- FormUtils.toForms(getConnectionConfigurationClass())
- );
- mFramework = new MFramework(connectionForms, new MJobForms(FormUtils.toForms(getJobConfigurationClass())),
- CURRENT_FRAMEWORK_VERSION);
-
- // Build validator
- validator = new FrameworkValidator();
-
- // Build upgrader
- upgrader = new FrameworkMetadataUpgrader();
- }
-
- public synchronized void initialize() {
- initialize(SqoopConfiguration.getInstance().getContext().getBoolean(ConfigurationConstants.FRAMEWORK_AUTO_UPGRADE, DEFAULT_AUTO_UPGRADE));
- }
-
- public synchronized void initialize(boolean autoUpgrade) {
- LOG.trace("Begin submission engine manager initialization");
-
- // Register framework metadata in repository
- mFramework = RepositoryManager.getInstance().getRepository().registerFramework(mFramework, autoUpgrade);
-
- SqoopConfiguration.getInstance().getProvider().registerListener(new CoreConfigurationListener(this));
-
- LOG.info("Submission manager initialized: OK");
- }
-
- public synchronized void destroy() {
- LOG.trace("Begin submission engine manager destroy");
- }
-
- public Validator getValidator() {
- return validator;
- }
-
- public MetadataUpgrader getMetadataUpgrader() {
- return upgrader;
- }
-
- public MFramework getFramework() {
- return mFramework;
- }
-
- public ResourceBundle getBundle(Locale locale) {
- return ResourceBundle.getBundle(
- FrameworkConstants.RESOURCE_BUNDLE_NAME, locale);
- }
-
- @Override
- public void configurationChanged() {
- LOG.info("Begin framework manager reconfiguring");
- // If there are configuration options for FrameworkManager,
- // implement the reconfiguration procedure right here.
- LOG.info("Framework manager reconfigured");
- }
-}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/049994a0/core/src/main/java/org/apache/sqoop/framework/FrameworkMetadataUpgrader.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/FrameworkMetadataUpgrader.java b/core/src/main/java/org/apache/sqoop/framework/FrameworkMetadataUpgrader.java
deleted file mode 100644
index 2437fa6..0000000
--- a/core/src/main/java/org/apache/sqoop/framework/FrameworkMetadataUpgrader.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.sqoop.framework;
-
-import org.apache.log4j.Logger;
-import org.apache.sqoop.common.SqoopException;
-import org.apache.sqoop.connector.spi.MetadataUpgrader;
-import org.apache.sqoop.model.MConnectionForms;
-import org.apache.sqoop.model.MForm;
-import org.apache.sqoop.model.MInput;
-import org.apache.sqoop.model.MJobForms;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class FrameworkMetadataUpgrader extends MetadataUpgrader{
-
- private static final Logger LOG = Logger.getLogger(FrameworkMetadataUpgrader.class);
-
- @Override
- public void upgrade(MConnectionForms original,
- MConnectionForms upgradeTarget) {
- doUpgrade(original.getForms(), upgradeTarget.getForms());
- }
-
- @Override
- public void upgrade(MJobForms original, MJobForms upgradeTarget) {
- doUpgrade(original.getForms(), upgradeTarget.getForms());
-
- }
-
- @SuppressWarnings("unchecked")
- private void doUpgrade(List<MForm> original, List<MForm> target) {
- // Easier to find the form in the original forms list if we use a map.
- // Since the constructor of MJobForms takes a list,
- // index is not guaranteed to be the same, so we need to look for
- // equivalence
- Map<String, MForm> formMap = new HashMap<String, MForm>();
- for (MForm form : original) {
- formMap.put(form.getName(), form);
- }
- for (MForm form : target) {
- List<MInput<?>> inputs = form.getInputs();
- MForm originalForm = formMap.get(form.getName());
- if(originalForm == null) {
- LOG.warn("Form: " + form.getName() + " not present in old " +
- "framework metadata. So it will not be transferred by the upgrader.");
- continue;
- }
-
- for (MInput input : inputs) {
- try {
- MInput originalInput = originalForm.getInput(input.getName());
- input.setValue(originalInput.getValue());
- } catch (SqoopException ex) {
- LOG.warn("Input: " + input.getName() + " not present in old " +
- "framework metadata. So it will not be transferred by the upgrader.");
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/049994a0/core/src/main/java/org/apache/sqoop/framework/FrameworkValidator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/FrameworkValidator.java b/core/src/main/java/org/apache/sqoop/framework/FrameworkValidator.java
deleted file mode 100644
index 46257f2..0000000
--- a/core/src/main/java/org/apache/sqoop/framework/FrameworkValidator.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.framework;
-
-import org.apache.sqoop.framework.configuration.ConnectionConfiguration;
-import org.apache.sqoop.framework.configuration.JobConfiguration;
-import org.apache.sqoop.framework.configuration.ThrottlingForm;
-import org.apache.sqoop.validation.Status;
-import org.apache.sqoop.validation.Validation;
-import org.apache.sqoop.validation.Validator;
-
-public class FrameworkValidator extends Validator {
- @Override
- public Validation validateConnection(Object connectionConfiguration) {
- Validation validation = new Validation(ConnectionConfiguration.class);
- // No validation on connection object
- return validation;
- }
-
- @Override
- public Validation validateJob(Object jobConfiguration) {
- Validation validation = new Validation(JobConfiguration.class);
- JobConfiguration conf = (JobConfiguration)jobConfiguration;
- validateThrottlingForm(validation,conf.throttling);
-
- return validation;
- };
-
- private void validateThrottlingForm(Validation validation, ThrottlingForm throttling) {
- if(throttling.extractors != null && throttling.extractors < 1) {
- validation.addMessage(Status.UNACCEPTABLE, "throttling", "extractors", "You need to specify more than one extractor");
- }
-
- if(throttling.loaders != null && throttling.loaders < 1) {
- validation.addMessage(Status.UNACCEPTABLE, "throttling", "loaders", "You need to specify more than one loader");
- }
- }
-
-}