You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ab...@apache.org on 2014/10/10 05:07:05 UTC

[13/50] [abbrv] SQOOP-1497: Sqoop2: Entity Nomenclature Revisited

http://git-wip-us.apache.org/repos/asf/sqoop/blob/049994a0/core/src/main/java/org/apache/sqoop/driver/JobManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/driver/JobManager.java b/core/src/main/java/org/apache/sqoop/driver/JobManager.java
new file mode 100644
index 0000000..277c6be
--- /dev/null
+++ b/core/src/main/java/org/apache/sqoop/driver/JobManager.java
@@ -0,0 +1,712 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.driver;
+
+import java.util.Date;
+import java.util.List;
+
+import org.apache.log4j.Logger;
+import org.apache.sqoop.common.Direction;
+import org.apache.sqoop.common.MapContext;
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.connector.ConnectorManager;
+import org.apache.sqoop.connector.idf.IntermediateDataFormat;
+import org.apache.sqoop.connector.spi.SqoopConnector;
+import org.apache.sqoop.core.Reconfigurable;
+import org.apache.sqoop.core.SqoopConfiguration;
+import org.apache.sqoop.core.SqoopConfiguration.CoreConfigurationListener;
+import org.apache.sqoop.driver.configuration.JobConfiguration;
+import org.apache.sqoop.job.etl.Destroyer;
+import org.apache.sqoop.job.etl.DestroyerContext;
+import org.apache.sqoop.job.etl.Initializer;
+import org.apache.sqoop.job.etl.InitializerContext;
+import org.apache.sqoop.job.etl.Transferable;
+import org.apache.sqoop.model.FormUtils;
+import org.apache.sqoop.model.MJob;
+import org.apache.sqoop.model.MLink;
+import org.apache.sqoop.model.MSubmission;
+import org.apache.sqoop.repository.Repository;
+import org.apache.sqoop.repository.RepositoryManager;
+import org.apache.sqoop.request.HttpEventContext;
+import org.apache.sqoop.schema.Schema;
+import org.apache.sqoop.submission.SubmissionStatus;
+import org.apache.sqoop.submission.counter.Counters;
+import org.apache.sqoop.utils.ClassUtils;
+import org.json.simple.JSONValue;
+
+public class JobManager implements Reconfigurable {
+  /**
+   * Logger object.
+   */
+  private static final Logger LOG = Logger.getLogger(JobManager.class);
+
+  /**
+   * Private instance to singleton of this class.
+   */
+  private static JobManager instance;
+  /**
+   * Create default object by default.
+   *
+   * Every Sqoop server application needs one so this should not be performance
+   * issue.
+   */
+  static {
+    instance = new JobManager();
+  }
+
+  /**
+   * Return current instance.
+   *
+   * @return Current instance
+   */
+  public static JobManager getInstance() {
+    return instance;
+  }
+
+  /**
+   * Allows to set instance in case that it's need.
+   *
+   * This method should not be normally used as the default instance should be
+   * sufficient. One target user use case for this method are unit tests.
+   *
+   * @param newInstance
+   *          New instance
+   */
+  public static void setInstance(JobManager newInstance) {
+    instance = newInstance;
+  }
+
+  /**
+   * Default interval for purging old submissions from repository.
+   */
+  private static final long DEFAULT_PURGE_THRESHOLD = 24 * 60 * 60 * 1000;
+
+  /**
+   * Default sleep interval for purge thread.
+   */
+  private static final long DEFAULT_PURGE_SLEEP = 24 * 60 * 60 * 1000;
+
+  /**
+   * Default interval for update thread.
+   */
+  private static final long DEFAULT_UPDATE_SLEEP = 60 * 5 * 1000;
+
+  /**
+   * Configured submission engine instance
+   */
+  private SubmissionEngine submissionEngine;
+
+  /**
+   * Configured execution engine instance
+   */
+  private ExecutionEngine executionEngine;
+
+  /**
+   * Purge thread that will periodically remove old submissions from repository.
+   */
+  private PurgeThread purgeThread = null;
+
+  /**
+   * Update thread that will periodically check status of running submissions.
+   */
+  private UpdateThread updateThread = null;
+
+  /**
+   * Synchronization variable between threads.
+   */
+  private boolean running = true;
+
+  /**
+   * Specifies how old submissions should be removed from repository.
+   */
+  private long purgeThreshold;
+
+  /**
+   * Number of milliseconds for purge thread to sleep.
+   */
+  private long purgeSleep;
+
+  /**
+   * Number of milliseconds for update thread to slepp.
+   */
+  private long updateSleep;
+
+  /**
+   * Base notification URL.
+   *
+   * Driver manager will always add job id.
+   */
+  private String notificationBaseUrl;
+
+  /**
+   * Set notification base URL.
+   *
+   * @param url
+   *          Base URL
+   */
+  public void setNotificationBaseUrl(String url) {
+    LOG.debug("Setting notification base URL to " + url);
+    notificationBaseUrl = url;
+  }
+
+  /**
+   * Get base notification url.
+   *
+   * @return String representation of the URL
+   */
+  public String getNotificationBaseUrl() {
+    return notificationBaseUrl;
+  }
+
+  public synchronized void destroy() {
+    LOG.trace("Begin submission engine manager destroy");
+
+    running = false;
+
+    try {
+      purgeThread.interrupt();
+      purgeThread.join();
+    } catch (InterruptedException e) {
+      // TODO(jarcec): Do I want to wait until it actually finish here?
+      LOG.error("Interrupted joining purgeThread");
+    }
+
+    try {
+      updateThread.interrupt();
+      updateThread.join();
+    } catch (InterruptedException e) {
+      // TODO(jarcec): Do I want to wait until it actually finish here?
+      LOG.error("Interrupted joining updateThread");
+    }
+
+    if (submissionEngine != null) {
+      submissionEngine.destroy();
+    }
+
+    if (executionEngine != null) {
+      executionEngine.destroy();
+    }
+  }
+
+  public synchronized void initialize() {
+    LOG.trace("Begin submission engine manager initialization");
+    MapContext context = SqoopConfiguration.getInstance().getContext();
+
+    // Let's load configured submission engine
+    String submissionEngineClassName =
+      context.getString(DriverConstants.SYSCFG_SUBMISSION_ENGINE);
+
+    submissionEngine = (SubmissionEngine) ClassUtils
+      .instantiate(submissionEngineClassName);
+    if (submissionEngine == null) {
+      throw new SqoopException(DriverError.DRIVER_0001,
+        submissionEngineClassName);
+    }
+
+    submissionEngine.initialize(context,
+        DriverConstants.PREFIX_SUBMISSION_ENGINE_CONFIG);
+
+    // Execution engine
+    String executionEngineClassName =
+      context.getString(DriverConstants.SYSCFG_EXECUTION_ENGINE);
+
+    executionEngine = (ExecutionEngine) ClassUtils
+      .instantiate(executionEngineClassName);
+    if (executionEngine == null) {
+      throw new SqoopException(DriverError.DRIVER_0007,
+        executionEngineClassName);
+    }
+
+    // We need to make sure that user has configured compatible combination of
+    // submission engine and execution engine
+    if (!submissionEngine
+      .isExecutionEngineSupported(executionEngine.getClass())) {
+      throw new SqoopException(DriverError.DRIVER_0008);
+    }
+
+    executionEngine.initialize(context,
+        DriverConstants.PREFIX_EXECUTION_ENGINE_CONFIG);
+
+    // Set up worker threads
+    purgeThreshold = context.getLong(
+      DriverConstants.SYSCFG_SUBMISSION_PURGE_THRESHOLD,
+      DEFAULT_PURGE_THRESHOLD
+      );
+    purgeSleep = context.getLong(
+      DriverConstants.SYSCFG_SUBMISSION_PURGE_SLEEP,
+      DEFAULT_PURGE_SLEEP
+      );
+
+    purgeThread = new PurgeThread();
+    purgeThread.start();
+
+    updateSleep = context.getLong(
+      DriverConstants.SYSCFG_SUBMISSION_UPDATE_SLEEP,
+      DEFAULT_UPDATE_SLEEP
+      );
+
+    updateThread = new UpdateThread();
+    updateThread.start();
+
+    SqoopConfiguration.getInstance().getProvider()
+      .registerListener(new CoreConfigurationListener(this));
+
+    LOG.info("Submission manager initialized: OK");
+  }
+
+  public MSubmission submit(long jobId, HttpEventContext ctx) {
+
+    MSubmission mSubmission = createJobSubmission(ctx, jobId);
+    JobRequest jobRequest = createJobRequest(jobId, mSubmission);
+    // Bootstrap job to execute
+    prepareJob(jobRequest);
+    // Make sure that this job id is not currently running and submit the job
+    // only if it's not.
+    synchronized (getClass()) {
+      MSubmission lastSubmission = RepositoryManager.getInstance().getRepository()
+          .findSubmissionLastForJob(jobId);
+      if (lastSubmission != null && lastSubmission.getStatus().isRunning()) {
+        throw new SqoopException(DriverError.DRIVER_0002, "Job with id " + jobId);
+      }
+      // TODO(Abe): Call multiple destroyers.
+      // TODO(jarcec): We might need to catch all exceptions here to ensure
+      // that Destroyer will be executed in all cases.
+      // NOTE: the following is a blocking call
+      boolean success = submissionEngine.submit(jobRequest);
+      if (!success) {
+        destroySubmission(jobRequest);
+        mSubmission.setStatus(SubmissionStatus.FAILURE_ON_SUBMIT);
+      }
+      RepositoryManager.getInstance().getRepository().createSubmission(mSubmission);
+    }
+    return mSubmission;
+  }
+
+  private JobRequest createJobRequest(long jobId, MSubmission submission) {
+    // get job
+    MJob job = getJob(jobId);
+
+    // get from/to connections for the job
+    MLink fromConnection = getLink(job.getLinkId(Direction.FROM));
+    MLink toConnection = getLink(job.getLinkId(Direction.TO));
+
+    // get from/to connectors for the connection
+    SqoopConnector fromConnector = getConnector(fromConnection.getConnectorId());
+    validateSupportedDirection(fromConnector, Direction.FROM);
+    SqoopConnector toConnector = getConnector(toConnection.getConnectorId());
+    validateSupportedDirection(toConnector, Direction.TO);
+
+    // Transform config to fromConnector specific classes
+    Object fromConnectionConfig = ClassUtils.instantiate(fromConnector
+        .getLinkConfigurationClass());
+    FormUtils.fromForms(fromConnection.getConnectorPart().getForms(), fromConnectionConfig);
+
+    // Transform config to toConnector specific classes
+    Object toConnectorConfig = ClassUtils
+        .instantiate(toConnector.getLinkConfigurationClass());
+    FormUtils.fromForms(toConnection.getConnectorPart().getForms(), toConnectorConfig);
+
+    Object fromJob = ClassUtils.instantiate(fromConnector.getJobConfigurationClass(Direction.FROM));
+    FormUtils.fromForms(job.getConnectorPart(Direction.FROM).getForms(), fromJob);
+
+    Object toJob = ClassUtils.instantiate(toConnector.getJobConfigurationClass(Direction.TO));
+    FormUtils.fromForms(job.getConnectorPart(Direction.TO).getForms(), toJob);
+
+    // Transform framework specific configs
+    // Q(VB) : Aren't the following 2 exactly the same?
+    Object fromDriverConnection = ClassUtils.instantiate(Driver.getInstance()
+        .getLinkConfigurationClass());
+    FormUtils.fromForms(fromConnection.getFrameworkPart().getForms(), fromDriverConnection);
+
+    Object toDriverConnection = ClassUtils.instantiate(Driver.getInstance()
+        .getLinkConfigurationClass());
+    FormUtils.fromForms(toConnection.getFrameworkPart().getForms(), toDriverConnection);
+
+    Object frameworkJob = ClassUtils.instantiate(Driver.getInstance()
+        .getJobConfigurationClass());
+    FormUtils.fromForms(job.getFrameworkPart().getForms(), frameworkJob);
+
+    // Create a job request for submit/execution
+    JobRequest jobRequest = executionEngine.createJobRequest();
+    // Save important variables to the job request
+    jobRequest.setSummary(submission);
+    jobRequest.setConnector(Direction.FROM, fromConnector);
+    jobRequest.setConnector(Direction.TO, toConnector);
+    jobRequest.setConnectorLinkConfig(Direction.FROM, fromConnectionConfig);
+    jobRequest.setConnectorLinkConfig(Direction.TO, toConnectorConfig);
+    jobRequest.setConnectorJobConfig(Direction.FROM, fromJob);
+    jobRequest.setConnectorJobConfig(Direction.TO, toJob);
+    // TODO(Abe): Should we actually have 2 different Driver Connection config objects?
+    jobRequest.setFrameworkLinkConfig(Direction.FROM, fromDriverConnection);
+    jobRequest.setFrameworkLinkConfig(Direction.TO, toDriverConnection);
+    jobRequest.setFrameworkJobConfig(frameworkJob);
+    jobRequest.setJobName(job.getName());
+    jobRequest.setJobId(job.getPersistenceId());
+    jobRequest.setNotificationUrl(notificationBaseUrl + jobId);
+    Class<? extends IntermediateDataFormat<?>> dataFormatClass =
+      fromConnector.getIntermediateDataFormat();
+    jobRequest.setIntermediateDataFormat(fromConnector.getIntermediateDataFormat());
+
+
+    jobRequest.setFrom(fromConnector.getFrom());
+    jobRequest.setTo(toConnector.getTo());
+
+    addStandardJars(jobRequest);
+    addConnectorJars(jobRequest, fromConnector, toConnector, dataFormatClass);
+    addConnectorInitializerJars(jobRequest, Direction.FROM);
+    addConnectorInitializerJars(jobRequest, Direction.TO);
+
+    Schema fromSchema = getSchemaFromConnector(jobRequest, Direction.FROM);
+    Schema toSchema = getSchemaFromConnector(jobRequest, Direction.TO);
+
+    // TODO(Gwen): Need better logic here once the Schema refactor: SQOOP-1378
+    if (fromSchema != null) {
+      jobRequest.getSummary().setFromSchema(fromSchema);
+    }
+    else {
+      jobRequest.getSummary().setFromSchema(toSchema);
+    }
+    LOG.debug("Using entities: " + jobRequest.getFrom() + ", " + jobRequest.getTo());
+    return jobRequest;
+  }
+
+  private void addConnectorJars(JobRequest jobRequest, SqoopConnector fromConnector,
+      SqoopConnector toConnector, Class<? extends IntermediateDataFormat<?>> dataFormatClass) {
+    jobRequest.addJarForClass(fromConnector.getClass());
+    jobRequest.addJarForClass(toConnector.getClass());
+    jobRequest.addJarForClass(dataFormatClass);
+  }
+
+  private void addStandardJars(JobRequest jobRequest) {
+    // Let's register all important jars
+    // sqoop-common
+    jobRequest.addJarForClass(MapContext.class);
+    // sqoop-core
+    jobRequest.addJarForClass(Driver.class);
+    // sqoop-spi
+    jobRequest.addJarForClass(SqoopConnector.class);
+    // Execution engine jar
+    jobRequest.addJarForClass(executionEngine.getClass());
+    // Extra libraries that Sqoop code requires
+    jobRequest.addJarForClass(JSONValue.class);
+  }
+
+  MSubmission createJobSubmission(HttpEventContext ctx, long jobId) {
+    MSubmission summary = new MSubmission(jobId);
+    summary.setCreationUser(ctx.getUsername());
+    summary.setLastUpdateUser(ctx.getUsername());
+    return summary;
+  }
+
+  SqoopConnector getConnector(long connnectorId) {
+    return ConnectorManager.getInstance().getConnector(connnectorId);
+  }
+
+  void validateSupportedDirection(SqoopConnector connector, Direction direction) {
+    // Make sure that connector supports the given direction
+    if (!connector.getSupportedDirections().contains(direction)) {
+      throw new SqoopException(DriverError.DRIVER_0011, "Connector: "
+          + connector.getClass().getCanonicalName());
+    }
+  }
+
+  MLink getLink(long linkId) {
+    MLink link = RepositoryManager.getInstance().getRepository()
+        .findLink(linkId);
+    if (!link.getEnabled()) {
+      throw new SqoopException(DriverError.DRIVER_0010, "Connection id: "
+          + link.getPersistenceId());
+    }
+    return link;
+  }
+
+  MJob getJob(long jobId) {
+    MJob job = RepositoryManager.getInstance().getRepository().findJob(jobId);
+    if (job == null) {
+      throw new SqoopException(DriverError.DRIVER_0004, "Unknown job id: " + jobId);
+    }
+
+    if (!job.getEnabled()) {
+      throw new SqoopException(DriverError.DRIVER_0009, "Job id: " + job.getPersistenceId());
+    }
+    return job;
+  }
+  
+  private Schema getSchemaFromConnector(JobRequest jobRequest, Direction direction) {
+
+    Initializer initializer = getConnectorInitializer(jobRequest, direction);
+
+    // Initializer context
+    InitializerContext initializerContext = getConnectorInitializerContext(jobRequest, direction);
+
+    // Initialize submission from the connector perspective
+    initializer.initialize(initializerContext, jobRequest.getConnectorLinkConfig(direction),
+        jobRequest.getConnectorJobConfig(direction));
+
+    // TODO(Abe): Alter behavior of Schema here.
+    return initializer.getSchema(initializerContext,
+        jobRequest.getConnectorLinkConfig(direction),
+        jobRequest.getConnectorJobConfig(direction));
+  }
+
+  private void addConnectorInitializerJars(JobRequest jobRequest, Direction direction) {
+
+    Initializer initializer = getConnectorInitializer(jobRequest, direction);
+    InitializerContext initializerContext = getConnectorInitializerContext(jobRequest, direction);
+    // Add job specific jars to
+    jobRequest.addJars(initializer.getJars(initializerContext,
+        jobRequest.getConnectorLinkConfig(direction),
+        jobRequest.getConnectorJobConfig(direction)));
+  }
+
+  private Initializer getConnectorInitializer(JobRequest jobRequest, Direction direction) {
+    Transferable transferable = direction.equals(Direction.FROM) ? jobRequest.getFrom() : jobRequest.getTo();
+    Class<? extends Initializer> initializerClass = transferable.getInitializer();
+    Initializer initializer = (Initializer) ClassUtils.instantiate(initializerClass);
+
+    if (initializer == null) {
+      throw new SqoopException(DriverError.DRIVER_0006,
+          "Can't create connector initializer instance: " + initializerClass.getName());
+    }
+    return initializer;
+  }
+
+  private InitializerContext getConnectorInitializerContext(JobRequest jobRequest, Direction direction) {
+    return new InitializerContext(jobRequest.getConnectorContext(direction));
+  }
+
+  void prepareJob(JobRequest request) {
+    JobConfiguration jobConfiguration = (JobConfiguration) request.getFrameworkJobConfig();
+    // We're directly moving configured number of extractors and loaders to
+    // underlying request object. In the future we might need to throttle this
+    // count based on other running jobs to meet our SLAs.
+    request.setExtractors(jobConfiguration.throttling.extractors);
+    request.setLoaders(jobConfiguration.throttling.loaders);
+
+    // Delegate rest of the job to execution engine
+    executionEngine.prepareJob(request);
+  }
+
+  /**
+   * Callback that will be called only if we failed to submit the job to the
+   * remote cluster.
+   */
+  void destroySubmission(JobRequest request) {
+    Transferable from = request.getFrom();
+    Transferable to = request.getTo();
+
+    Class<? extends Destroyer> fromDestroyerClass = from.getDestroyer();
+    Class<? extends Destroyer> toDestroyerClass = to.getDestroyer();
+    Destroyer fromDestroyer = (Destroyer) ClassUtils.instantiate(fromDestroyerClass);
+    Destroyer toDestroyer = (Destroyer) ClassUtils.instantiate(toDestroyerClass);
+
+    if (fromDestroyer == null) {
+      throw new SqoopException(DriverError.DRIVER_0006,
+        "Can't create toDestroyer instance: " + fromDestroyerClass.getName());
+    }
+
+    if (toDestroyer == null) {
+      throw new SqoopException(DriverError.DRIVER_0006,
+          "Can't create toDestroyer instance: " + toDestroyerClass.getName());
+    }
+
+    // TODO(Abe): Update context to manage multiple connectors. As well as summary.
+    DestroyerContext fromDestroyerContext = new DestroyerContext(
+      request.getConnectorContext(Direction.FROM), false, request.getSummary()
+        .getFromSchema());
+    DestroyerContext toDestroyerContext = new DestroyerContext(
+        request.getConnectorContext(Direction.TO), false, request.getSummary()
+        .getToSchema());
+
+    // destroy submission from connector perspective
+    fromDestroyer.destroy(fromDestroyerContext, request.getConnectorLinkConfig(Direction.FROM),
+        request.getConnectorJobConfig(Direction.FROM));
+    toDestroyer.destroy(toDestroyerContext, request.getConnectorLinkConfig(Direction.TO),
+        request.getConnectorJobConfig(Direction.TO));
+  }
+
+  public MSubmission stop(long jobId, HttpEventContext ctx) {
+
+    Repository repository = RepositoryManager.getInstance().getRepository();
+    MSubmission mSubmission = repository.findSubmissionLastForJob(jobId);
+
+    if (mSubmission == null || !mSubmission.getStatus().isRunning()) {
+      throw new SqoopException(DriverError.DRIVER_0003, "Job with id " + jobId
+          + " is not running");
+    }
+    submissionEngine.stop(mSubmission.getExternalId());
+
+    mSubmission.setLastUpdateUser(ctx.getUsername());
+
+    // Fetch new information to verify that the stop command has actually worked
+    update(mSubmission);
+
+    // Return updated structure
+    return mSubmission;
+  }
+
+  public MSubmission status(long jobId) {
+    Repository repository = RepositoryManager.getInstance().getRepository();
+    MSubmission mSubmission = repository.findSubmissionLastForJob(jobId);
+
+    if (mSubmission == null) {
+      return new MSubmission(jobId, new Date(), SubmissionStatus.NEVER_EXECUTED);
+    }
+
+    // If the submission is in running state, let's update it
+    if (mSubmission.getStatus().isRunning()) {
+      update(mSubmission);
+    }
+
+    return mSubmission;
+  }
+
+  private void update(MSubmission submission) {
+    double progress = -1;
+    Counters counters = null;
+    String externalId = submission.getExternalId();
+    SubmissionStatus newStatus = submissionEngine.status(externalId);
+    String externalLink = submissionEngine.externalLink(externalId);
+
+    if (newStatus.isRunning()) {
+      progress = submissionEngine.progress(externalId);
+    } else {
+      counters = submissionEngine.counters(externalId);
+    }
+
+    submission.setStatus(newStatus);
+    submission.setProgress(progress);
+    submission.setCounters(counters);
+    submission.setExternalLink(externalLink);
+    submission.setLastUpdateDate(new Date());
+
+    RepositoryManager.getInstance().getRepository()
+      .updateSubmission(submission);
+  }
+
+  @Override
+  public synchronized void configurationChanged() {
+    LOG.info("Begin submission engine manager reconfiguring");
+    MapContext newContext = SqoopConfiguration.getInstance().getContext();
+    MapContext oldContext = SqoopConfiguration.getInstance().getOldContext();
+
+    String newSubmissionEngineClassName = newContext
+      .getString(DriverConstants.SYSCFG_SUBMISSION_ENGINE);
+    if (newSubmissionEngineClassName == null
+      || newSubmissionEngineClassName.trim().length() == 0) {
+      throw new SqoopException(DriverError.DRIVER_0001,
+        newSubmissionEngineClassName);
+    }
+
+    String oldSubmissionEngineClassName = oldContext
+      .getString(DriverConstants.SYSCFG_SUBMISSION_ENGINE);
+    if (!newSubmissionEngineClassName.equals(oldSubmissionEngineClassName)) {
+      LOG.warn("Submission engine cannot be replaced at the runtime. " +
+        "You might need to restart the server.");
+    }
+
+    String newExecutionEngineClassName = newContext
+      .getString(DriverConstants.SYSCFG_EXECUTION_ENGINE);
+    if (newExecutionEngineClassName == null
+      || newExecutionEngineClassName.trim().length() == 0) {
+      throw new SqoopException(DriverError.DRIVER_0007,
+        newExecutionEngineClassName);
+    }
+
+    String oldExecutionEngineClassName = oldContext
+      .getString(DriverConstants.SYSCFG_EXECUTION_ENGINE);
+    if (!newExecutionEngineClassName.equals(oldExecutionEngineClassName)) {
+      LOG.warn("Execution engine cannot be replaced at the runtime. " +
+        "You might need to restart the server.");
+    }
+
+    // Set up worker threads
+    purgeThreshold = newContext.getLong(
+      DriverConstants.SYSCFG_SUBMISSION_PURGE_THRESHOLD,
+      DEFAULT_PURGE_THRESHOLD
+      );
+    purgeSleep = newContext.getLong(
+      DriverConstants.SYSCFG_SUBMISSION_PURGE_SLEEP,
+      DEFAULT_PURGE_SLEEP
+      );
+    purgeThread.interrupt();
+
+    updateSleep = newContext.getLong(
+      DriverConstants.SYSCFG_SUBMISSION_UPDATE_SLEEP,
+      DEFAULT_UPDATE_SLEEP
+      );
+    updateThread.interrupt();
+
+    LOG.info("Submission engine manager reconfigured.");
+  }
+
+  private class PurgeThread extends Thread {
+    public PurgeThread() {
+      super("PurgeThread");
+    }
+
+    public void run() {
+      LOG.info("Starting submission manager purge thread");
+
+      while (running) {
+        try {
+          LOG.info("Purging old submissions");
+          Date threshold = new Date((new Date()).getTime() - purgeThreshold);
+          RepositoryManager.getInstance().getRepository()
+            .purgeSubmissions(threshold);
+          Thread.sleep(purgeSleep);
+        } catch (InterruptedException e) {
+          LOG.debug("Purge thread interrupted", e);
+        }
+      }
+
+      LOG.info("Ending submission manager purge thread");
+    }
+  }
+
+  private class UpdateThread extends Thread {
+    public UpdateThread() {
+      super("UpdateThread");
+    }
+
+    public void run() {
+      LOG.info("Starting submission manager update thread");
+
+      while (running) {
+        try {
+          LOG.debug("Updating running submissions");
+
+          // Let's get all running submissions from repository to check them out
+          List<MSubmission> unfinishedSubmissions =
+            RepositoryManager.getInstance().getRepository()
+              .findSubmissionsUnfinished();
+
+          for (MSubmission submission : unfinishedSubmissions) {
+            update(submission);
+          }
+
+          Thread.sleep(updateSleep);
+        } catch (InterruptedException e) {
+          LOG.debug("Purge thread interrupted", e);
+        }
+      }
+
+      LOG.info("Ending submission manager update thread");
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/sqoop/blob/049994a0/core/src/main/java/org/apache/sqoop/driver/JobRequest.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/driver/JobRequest.java b/core/src/main/java/org/apache/sqoop/driver/JobRequest.java
new file mode 100644
index 0000000..63e1e49
--- /dev/null
+++ b/core/src/main/java/org/apache/sqoop/driver/JobRequest.java
@@ -0,0 +1,356 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.driver;
+
+import org.apache.sqoop.common.Direction;
+import org.apache.sqoop.common.DirectionError;
+import org.apache.sqoop.common.MutableMapContext;
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.connector.idf.IntermediateDataFormat;
+import org.apache.sqoop.connector.spi.SqoopConnector;
+import org.apache.sqoop.job.etl.Transferable;
+import org.apache.sqoop.model.MSubmission;
+import org.apache.sqoop.utils.ClassUtils;
+
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * Submission details class is used when creating new submission and contains
+ * all information that we need to create a new submission (including mappers,
+ * reducers, ...).
+ */
+public class JobRequest {
+
+  /**
+   * Submission summary
+   */
+  MSubmission summary;
+
+  /**
+   * Original job name
+   */
+  String jobName;
+
+  /**
+   * Associated job (from metadata perspective) id
+   */
+  long jobId;
+
+  /**
+   * Connector instances associated with this submission request
+   */
+  SqoopConnector fromConnector;
+  SqoopConnector toConnector;
+
+  /**
+   * List of required local jars for the job
+   */
+  List<String> jars;
+
+  /**
+   * From entity
+   */
+  Transferable from;
+
+  /**
+   * To entity
+   */
+  Transferable to;
+
+  /**
+   * All configuration objects
+   */
+  Object fromConnectorLinkConfig;
+  Object toConnectorLinkConfig;
+  Object fromConnectorJobConfig;
+  Object toConnectorJobConfig;
+  Object fromFrameworkLinkConfig;
+  Object toFrameworkLinkConfig;
+  Object frameworkJobConfig;
+
+  /**
+   * Connector context (submission specific configuration)
+   */
+  MutableMapContext fromConnectorContext;
+  MutableMapContext toConnectorContext;
+
+  /**
+   * Framework context (submission specific configuration)
+   */
+  MutableMapContext driverContext;
+
+  /**
+   * Optional notification URL for job progress
+   */
+  String notificationUrl;
+
+  /**
+   * Number of extractors
+   */
+  Integer extractors;
+
+  /**
+   * Number of loaders
+   */
+  Integer loaders;
+
+  /**
+   * The intermediate data format this submission should use.
+   */
+  Class<? extends IntermediateDataFormat> intermediateDataFormat;
+
+  public JobRequest() {
+    this.jars = new LinkedList<String>();
+    this.fromConnectorContext = new MutableMapContext();
+    this.toConnectorContext = new MutableMapContext();
+    this.driverContext = new MutableMapContext();
+    this.fromConnector = null;
+    this.toConnector = null;
+    this.fromConnectorLinkConfig = null;
+    this.toConnectorLinkConfig = null;
+    this.fromConnectorJobConfig = null;
+    this.toConnectorJobConfig = null;
+    this.fromFrameworkLinkConfig = null;
+    this.toFrameworkLinkConfig = null;
+  }
+
+  public MSubmission getSummary() {
+    return summary;
+  }
+
+  public void setSummary(MSubmission summary) {
+    this.summary = summary;
+  }
+
+  public String getJobName() {
+    return jobName;
+  }
+
+  public void setJobName(String jobName) {
+    this.jobName = jobName;
+  }
+
+  public long getJobId() {
+    return jobId;
+  }
+
+  public void setJobId(long jobId) {
+    this.jobId = jobId;
+  }
+
+  public SqoopConnector getConnector(Direction type) {
+    switch(type) {
+      case FROM:
+        return fromConnector;
+
+      case TO:
+        return toConnector;
+
+      default:
+        throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type);
+    }
+  }
+
+  public void setConnector(Direction type, SqoopConnector connector) {
+    switch(type) {
+      case FROM:
+        fromConnector = connector;
+        break;
+
+      case TO:
+        toConnector = connector;
+        break;
+
+      default:
+        throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type);
+    }
+  }
+
+  public List<String> getJars() {
+    return jars;
+  }
+
+  public void addJar(String jar) {
+    if(!jars.contains(jar)) {
+      jars.add(jar);
+    }
+  }
+
+  public void addJarForClass(Class klass) {
+    addJar(ClassUtils.jarForClass(klass));
+  }
+
+  public void addJars(List<String> jars) {
+    for(String j : jars) {
+      addJar(j);
+    }
+  }
+
+  public Transferable getFrom() {
+    return from;
+  }
+
+  public void setFrom(Transferable from) {
+    this.from = from;
+  }
+
+  public Transferable getTo() {
+    return to;
+  }
+
+  public void setTo(Transferable to) {
+    this.to = to;
+  }
+
+  public Object getConnectorLinkConfig(Direction type) {
+    switch(type) {
+      case FROM:
+        return fromConnectorLinkConfig;
+
+      case TO:
+        return toConnectorLinkConfig;
+
+      default:
+        throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type);
+    }
+  }
+
+  public void setConnectorLinkConfig(Direction type, Object config) {
+    switch(type) {
+      case FROM:
+        fromConnectorLinkConfig = config;
+        break;
+      case TO:
+        toConnectorLinkConfig = config;
+        break;
+      default:
+        throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type);
+    }
+  }
+
+  public Object getConnectorJobConfig(Direction type) {
+    switch(type) {
+      case FROM:
+        return fromConnectorJobConfig;
+
+      case TO:
+        return toConnectorJobConfig;
+
+      default:
+        throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type);
+    }
+  }
+
+  public void setConnectorJobConfig(Direction type, Object config) {
+    switch(type) {
+      case FROM:
+        fromConnectorJobConfig = config;
+        break;
+      case TO:
+        toConnectorJobConfig = config;
+        break;
+      default:
+        throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type);
+    }
+  }
+
+  public Object getFrameworkLinkConfig(Direction type) {
+    switch(type) {
+      case FROM:
+        return fromFrameworkLinkConfig;
+
+      case TO:
+        return toFrameworkLinkConfig;
+
+      default:
+        throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type);
+    }
+  }
+
+  public void setFrameworkLinkConfig(Direction type, Object config) {
+    switch(type) {
+      case FROM:
+        fromFrameworkLinkConfig = config;
+        break;
+      case TO:
+        toFrameworkLinkConfig = config;
+        break;
+      default:
+        throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type);
+    }
+  }
+
+  public Object getFrameworkJobConfig() {
+    return frameworkJobConfig;
+  }
+
+  public void setFrameworkJobConfig(Object config) {
+    frameworkJobConfig = config;
+  }
+
+  public MutableMapContext getConnectorContext(Direction type) {
+    switch(type) {
+      case FROM:
+        return fromConnectorContext;
+
+      case TO:
+        return toConnectorContext;
+
+      default:
+        throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type);
+    }
+  }
+
+  public MutableMapContext getDriverContext() {
+    return driverContext;
+  }
+
+  public String getNotificationUrl() {
+    return notificationUrl;
+  }
+
+  public void setNotificationUrl(String url) {
+    this.notificationUrl = url;
+  }
+
+  public Integer getExtractors() {
+    return extractors;
+  }
+
+  public void setExtractors(Integer extractors) {
+    this.extractors = extractors;
+  }
+
+  public Integer getLoaders() {
+    return loaders;
+  }
+
+  public void setLoaders(Integer loaders) {
+    this.loaders = loaders;
+  }
+
+  public Class<? extends IntermediateDataFormat> getIntermediateDataFormat() {
+    return intermediateDataFormat;
+  }
+
+  public void setIntermediateDataFormat(Class<? extends IntermediateDataFormat> intermediateDataFormat) {
+    this.intermediateDataFormat = intermediateDataFormat;
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/sqoop/blob/049994a0/core/src/main/java/org/apache/sqoop/driver/SubmissionEngine.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/driver/SubmissionEngine.java b/core/src/main/java/org/apache/sqoop/driver/SubmissionEngine.java
new file mode 100644
index 0000000..3a32e9f
--- /dev/null
+++ b/core/src/main/java/org/apache/sqoop/driver/SubmissionEngine.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.driver;
+
+import org.apache.sqoop.common.MapContext;
+import org.apache.sqoop.submission.counter.Counters;
+import org.apache.sqoop.submission.SubmissionStatus;
+
+/**
+ * Submission engine is responsible in conveying the information about the
+ * job instances (submissions) to remote (hadoop) cluster.
+ */
+public abstract class SubmissionEngine {
+
+  /**
+   * Initialize submission engine
+   *
+   * @param context Configuration context
+   * @param prefix Submission engine prefix
+   */
+  public void initialize(MapContext context, String prefix) {
+  }
+
+  /**
+   * Destroy submission engine when stopping server
+   */
+  public void destroy() {
+  }
+
+  /**
+   * Callback to verify that configured submission engine and execution engine
+   * are compatible.
+   *
+   * @param executionEngineClass Configured execution class.
+   * @return True if such execution engine is supported
+   */
+  public abstract boolean isExecutionEngineSupported(Class<?> executionEngineClass);
+
+  /**
+   * Submit new job to remote (hadoop) cluster. This method *must* fill
+   * submission.getSummary.setExternalId(), otherwise Sqoop won't
+   * be able to track progress on this job!
+   *
+   * @return Return true if we were able to submit job to remote cluster.
+   */
+  public abstract boolean submit(JobRequest submission);
+
+  /**
+   * Hard stop for given submission.
+   *
+   * @param submissionId Submission internal id.
+   */
+  public abstract void stop(String submissionId);
+
+  /**
+   * Return status of given submission.
+   *
+   * @param submissionId Submission internal id.
+   * @return Current submission status.
+   */
+  public abstract SubmissionStatus status(String submissionId);
+
+  /**
+   * Return submission progress.
+   *
+   * Expected is number from interval <0, 1> denoting how far the processing
+   * has gone or -1 in case that this submission engine do not supports
+   * progress reporting.
+   *
+   * @param submissionId Submission internal id.
+   * @return {-1} union <0, 1>
+   */
+  public double progress(String submissionId) {
+    return -1;
+  }
+
+  /**
+   * Return statistics for given submission id.
+   *
+   * Sqoop will call counters only for submission in state SUCCEEDED,
+   * it's consider exceptional state to call this method for other states.
+   *
+   * @param submissionId Submission internal id.
+   * @return Submission statistics
+   */
+  public Counters counters(String submissionId) {
+    return null;
+  }
+
+  /**
+   * Return link to external web page with given submission.
+   *
+   * @param submissionId Submission internal id.
+   * @return Null in case that external page is not supported or available or
+   *  HTTP link to given submission.
+   */
+  public String externalLink(String submissionId) {
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/049994a0/core/src/main/java/org/apache/sqoop/driver/configuration/JobConfiguration.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/driver/configuration/JobConfiguration.java b/core/src/main/java/org/apache/sqoop/driver/configuration/JobConfiguration.java
new file mode 100644
index 0000000..908a4eb
--- /dev/null
+++ b/core/src/main/java/org/apache/sqoop/driver/configuration/JobConfiguration.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.driver.configuration;
+
+import org.apache.sqoop.model.ConfigurationClass;
+import org.apache.sqoop.model.Form;
+
+/**
+ * Representing the core job configuration
+ */
+@ConfigurationClass
+public class JobConfiguration {
+  @Form
+  public ThrottlingForm throttling;
+
+  public JobConfiguration() {
+    throttling = new ThrottlingForm();
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/049994a0/core/src/main/java/org/apache/sqoop/driver/configuration/LinkConfiguration.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/driver/configuration/LinkConfiguration.java b/core/src/main/java/org/apache/sqoop/driver/configuration/LinkConfiguration.java
new file mode 100644
index 0000000..3202844
--- /dev/null
+++ b/core/src/main/java/org/apache/sqoop/driver/configuration/LinkConfiguration.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.driver.configuration;
+
+import org.apache.sqoop.model.ConfigurationClass;
+
+/**
+ * Representing the core link configuration
+ */
+@ConfigurationClass
+public class LinkConfiguration {
+
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/049994a0/core/src/main/java/org/apache/sqoop/driver/configuration/ThrottlingForm.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/driver/configuration/ThrottlingForm.java b/core/src/main/java/org/apache/sqoop/driver/configuration/ThrottlingForm.java
new file mode 100644
index 0000000..e73007e
--- /dev/null
+++ b/core/src/main/java/org/apache/sqoop/driver/configuration/ThrottlingForm.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.driver.configuration;
+
+import org.apache.sqoop.model.FormClass;
+import org.apache.sqoop.model.Input;
+
+/**
+ * Form to set up number of loaders and extractors
+ */
+@FormClass
+public class ThrottlingForm {
+
+  @Input public Integer extractors;
+
+  @Input public Integer loaders;
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/049994a0/core/src/main/java/org/apache/sqoop/framework/ExecutionEngine.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/ExecutionEngine.java b/core/src/main/java/org/apache/sqoop/framework/ExecutionEngine.java
deleted file mode 100644
index 75b570d..0000000
--- a/core/src/main/java/org/apache/sqoop/framework/ExecutionEngine.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.framework;
-
-import org.apache.sqoop.common.ImmutableContext;
-
-/**
- * Execution engine drives execution of sqoop job. It's responsible
- * for executing all defined steps in the import/export workflow.
- * A successful job execution will be recorded in the job submission entity
- */
-public abstract class ExecutionEngine {
-
-  /**
-   * Initialize execution engine
-   *
-   * @param context Configuration context
-   * @parma prefix Execution engine prefix
-   */
-  public void initialize(ImmutableContext context, String prefix) {
-  }
-
-  /**
-   * Destroy execution engine when stopping server
-   */
-  public void destroy() {
-  }
-
-  /**
-   * Return new JobRequest class or any subclass if it's needed by
-   * execution and submission engine combination.
-   *
-   * @return new JobRequestobject
-   */
-  public JobRequest createJobRequest() {
-    return new JobRequest();
-  }
-
-  /**
-   * Prepare given job request.
-   *
-   * @param request JobRequest
-   */
-  public abstract void prepareJob(JobRequest request);
-}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/049994a0/core/src/main/java/org/apache/sqoop/framework/FrameworkConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/FrameworkConstants.java b/core/src/main/java/org/apache/sqoop/framework/FrameworkConstants.java
deleted file mode 100644
index 4293dce..0000000
--- a/core/src/main/java/org/apache/sqoop/framework/FrameworkConstants.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.framework;
-
-import org.apache.sqoop.core.ConfigurationConstants;
-
-/**
- * Constants that are used in framework module.
- */
-public final class FrameworkConstants {
-
-  // Sqoop configuration constants
-
-  public static final String PREFIX_SUBMISSION_CONFIG =
-    ConfigurationConstants.PREFIX_GLOBAL_CONFIG + "submission.";
-
-  public static final String PREFIX_EXECUTION_CONFIG =
-    ConfigurationConstants.PREFIX_GLOBAL_CONFIG + "execution.";
-
-  public static final String SYSCFG_SUBMISSION_ENGINE =
-    PREFIX_SUBMISSION_CONFIG + "engine";
-
-  public static final String PREFIX_SUBMISSION_ENGINE_CONFIG =
-    SYSCFG_SUBMISSION_ENGINE + ".";
-
-  public static final String PREFIX_SUBMISSION_PURGE_CONFIG =
-    PREFIX_SUBMISSION_CONFIG + "purge.";
-
-  public static final String SYSCFG_SUBMISSION_PURGE_THRESHOLD =
-    PREFIX_SUBMISSION_PURGE_CONFIG + "threshold";
-
-  public static final String SYSCFG_SUBMISSION_PURGE_SLEEP =
-    PREFIX_SUBMISSION_PURGE_CONFIG + "sleep";
-
-  public static final String PREFIX_SUBMISSION_UPDATE_CONFIG =
-    PREFIX_SUBMISSION_CONFIG + "update.";
-
-  public static final String SYSCFG_SUBMISSION_UPDATE_SLEEP =
-    PREFIX_SUBMISSION_UPDATE_CONFIG + "sleep";
-
-  public static final String SYSCFG_EXECUTION_ENGINE =
-    PREFIX_EXECUTION_CONFIG + "engine";
-
-  public static final String PREFIX_EXECUTION_ENGINE_CONFIG =
-    SYSCFG_EXECUTION_ENGINE + ".";
-
-  // Bundle names
-
-  public static final String RESOURCE_BUNDLE_NAME = "framework-resources";
-
-  private FrameworkConstants() {
-    // Instantiation of this class is prohibited
-  }
-}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/049994a0/core/src/main/java/org/apache/sqoop/framework/FrameworkError.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/FrameworkError.java b/core/src/main/java/org/apache/sqoop/framework/FrameworkError.java
deleted file mode 100644
index 8ecb197..0000000
--- a/core/src/main/java/org/apache/sqoop/framework/FrameworkError.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.framework;
-
-import org.apache.sqoop.common.ErrorCode;
-
-/**
- *
- */
-public enum FrameworkError implements ErrorCode {
-
-  FRAMEWORK_0000("Metadata are not registered in repository"),
-
-  FRAMEWORK_0001("Invalid submission engine"),
-
-  FRAMEWORK_0002("Given job is already running"),
-
-  FRAMEWORK_0003("Given job is not running"),
-
-  FRAMEWORK_0004("Unknown job id"),
-
-  FRAMEWORK_0005("Unsupported job type"),
-
-  FRAMEWORK_0006("Can't bootstrap job"),
-
-  FRAMEWORK_0007("Invalid execution engine"),
-
-  FRAMEWORK_0008("Invalid combination of submission and execution engines"),
-
-  FRAMEWORK_0009("Job has been disabled. Cannot submit this job."),
-
-  FRAMEWORK_0010("Connection for this job has been disabled. Cannot submit this job."),
-
-  FRAMEWORK_0011("Connector does not support direction. Cannot submit this job."),
-
-  ;
-
-  private final String message;
-
-  private FrameworkError(String message) {
-    this.message = message;
-  }
-
-  public String getCode() {
-    return name();
-  }
-
-  public String getMessage() {
-    return message;
-  }
-}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/049994a0/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java b/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java
deleted file mode 100644
index 81e1147..0000000
--- a/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java
+++ /dev/null
@@ -1,178 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.framework;
-
-import org.apache.log4j.Logger;
-import org.apache.sqoop.connector.spi.MetadataUpgrader;
-import org.apache.sqoop.core.ConfigurationConstants;
-import org.apache.sqoop.core.Reconfigurable;
-import org.apache.sqoop.core.SqoopConfiguration;
-import org.apache.sqoop.core.SqoopConfiguration.CoreConfigurationListener;
-import org.apache.sqoop.framework.configuration.ConnectionConfiguration;
-import org.apache.sqoop.framework.configuration.JobConfiguration;
-import org.apache.sqoop.model.*;
-import org.apache.sqoop.repository.RepositoryManager;
-import org.apache.sqoop.validation.Validator;
-
-import java.util.Locale;
-import java.util.ResourceBundle;
-
-/**
- * Manager for Sqoop framework itself.
- *
- * All Sqoop internals are handled in this class:
- * * Submission engine
- * * Execution engine
- * * Framework metadata
- *
- * Current implementation of entire submission engine is using repository
- * for keeping track of running submissions. Thus, server might be restarted at
- * any time without any affect on running jobs. This approach however might not
- * be the fastest way and we might want to introduce internal structures for
- * running jobs in case that this approach will be too slow.
- */
-public class FrameworkManager implements Reconfigurable {
-
-  /**
-   * Logger object.
-   */
-  private static final Logger LOG = Logger.getLogger(FrameworkManager.class);
-
-  /**
-   * Private instance to singleton of this class.
-   */
-  private static FrameworkManager instance;
-
-  /**
-   * Create default object by default.
-   *
-   * Every Sqoop server application needs one so this should not be performance issue.
-   */
-  static {
-    instance = new FrameworkManager();
-  }
-
-  /**
-   * Return current instance.
-   *
-   * @return Current instance
-   */
-  public static FrameworkManager getInstance() {
-    return instance;
-  }
-
-  /**
-   * Allows to set instance in case that it's need.
-   *
-   * This method should not be normally used as the default instance should be sufficient. One target
-   * user use case for this method are unit tests.
-   *
-   * @param newInstance New instance
-   */
-  public static void setInstance(FrameworkManager newInstance) {
-    instance = newInstance;
-  }
-
-  /**
-   * Framework metadata structures in MForm format
-   */
-  private MFramework mFramework;
-
-  /**
-   * Validator instance
-   */
-  private final Validator validator;
-
-  /**
-   * Upgrader instance
-   */
-  private final MetadataUpgrader upgrader;
-
-  /**
-   * Default framework auto upgrade option value
-   */
-  private static final boolean DEFAULT_AUTO_UPGRADE = false;
-
-  public static final String CURRENT_FRAMEWORK_VERSION = "1";
-
-  public Class getJobConfigurationClass() {
-      return JobConfiguration.class;
-  }
-
-  public Class getConnectionConfigurationClass() {
-      return ConnectionConfiguration.class;
-  }
-
-  public FrameworkManager() {
-    MConnectionForms connectionForms = new MConnectionForms(
-      FormUtils.toForms(getConnectionConfigurationClass())
-    );
-    mFramework = new MFramework(connectionForms, new MJobForms(FormUtils.toForms(getJobConfigurationClass())),
-        CURRENT_FRAMEWORK_VERSION);
-
-    // Build validator
-    validator = new FrameworkValidator();
-
-    // Build upgrader
-    upgrader = new FrameworkMetadataUpgrader();
-  }
-
-  public synchronized void initialize() {
-    initialize(SqoopConfiguration.getInstance().getContext().getBoolean(ConfigurationConstants.FRAMEWORK_AUTO_UPGRADE, DEFAULT_AUTO_UPGRADE));
-  }
-
-  public synchronized void initialize(boolean autoUpgrade) {
-    LOG.trace("Begin submission engine manager initialization");
-
-    // Register framework metadata in repository
-    mFramework = RepositoryManager.getInstance().getRepository().registerFramework(mFramework, autoUpgrade);
-
-    SqoopConfiguration.getInstance().getProvider().registerListener(new CoreConfigurationListener(this));
-
-    LOG.info("Submission manager initialized: OK");
-  }
-
-  public  synchronized void destroy() {
-    LOG.trace("Begin submission engine manager destroy");
-  }
-
-  public Validator getValidator() {
-    return validator;
-  }
-
-  public MetadataUpgrader getMetadataUpgrader() {
-    return upgrader;
-  }
-
-  public MFramework getFramework() {
-    return mFramework;
-  }
-
-  public ResourceBundle getBundle(Locale locale) {
-    return ResourceBundle.getBundle(
-        FrameworkConstants.RESOURCE_BUNDLE_NAME, locale);
-  }
-
-  @Override
-  public void configurationChanged() {
-    LOG.info("Begin framework manager reconfiguring");
-    // If there are configuration options for FrameworkManager,
-    // implement the reconfiguration procedure right here.
-    LOG.info("Framework manager reconfigured");
-  }
-}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/049994a0/core/src/main/java/org/apache/sqoop/framework/FrameworkMetadataUpgrader.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/FrameworkMetadataUpgrader.java b/core/src/main/java/org/apache/sqoop/framework/FrameworkMetadataUpgrader.java
deleted file mode 100644
index 2437fa6..0000000
--- a/core/src/main/java/org/apache/sqoop/framework/FrameworkMetadataUpgrader.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.sqoop.framework;
-
-import org.apache.log4j.Logger;
-import org.apache.sqoop.common.SqoopException;
-import org.apache.sqoop.connector.spi.MetadataUpgrader;
-import org.apache.sqoop.model.MConnectionForms;
-import org.apache.sqoop.model.MForm;
-import org.apache.sqoop.model.MInput;
-import org.apache.sqoop.model.MJobForms;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class FrameworkMetadataUpgrader extends MetadataUpgrader{
-
-  private static final Logger LOG = Logger.getLogger(FrameworkMetadataUpgrader.class);
-
-  @Override
-  public void upgrade(MConnectionForms original,
-    MConnectionForms upgradeTarget) {
-    doUpgrade(original.getForms(), upgradeTarget.getForms());
-  }
-
-  @Override
-  public void upgrade(MJobForms original, MJobForms upgradeTarget) {
-    doUpgrade(original.getForms(), upgradeTarget.getForms());
-
-  }
-
-  @SuppressWarnings("unchecked")
-  private void doUpgrade(List<MForm> original, List<MForm> target) {
-    // Easier to find the form in the original forms list if we use a map.
-    // Since the constructor of MJobForms takes a list,
-    // index is not guaranteed to be the same, so we need to look for
-    // equivalence
-    Map<String, MForm> formMap = new HashMap<String, MForm>();
-    for (MForm form : original) {
-      formMap.put(form.getName(), form);
-    }
-    for (MForm form : target) {
-      List<MInput<?>> inputs = form.getInputs();
-      MForm originalForm = formMap.get(form.getName());
-      if(originalForm == null) {
-        LOG.warn("Form: " + form.getName() + " not present in old " +
-          "framework metadata. So it will not be transferred by the upgrader.");
-        continue;
-      }
-
-      for (MInput input : inputs) {
-        try {
-          MInput originalInput = originalForm.getInput(input.getName());
-          input.setValue(originalInput.getValue());
-        } catch (SqoopException ex) {
-          LOG.warn("Input: " + input.getName() + " not present in old " +
-            "framework metadata. So it will not be transferred by the upgrader.");
-        }
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/049994a0/core/src/main/java/org/apache/sqoop/framework/FrameworkValidator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/FrameworkValidator.java b/core/src/main/java/org/apache/sqoop/framework/FrameworkValidator.java
deleted file mode 100644
index 46257f2..0000000
--- a/core/src/main/java/org/apache/sqoop/framework/FrameworkValidator.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.framework;
-
-import org.apache.sqoop.framework.configuration.ConnectionConfiguration;
-import org.apache.sqoop.framework.configuration.JobConfiguration;
-import org.apache.sqoop.framework.configuration.ThrottlingForm;
-import org.apache.sqoop.validation.Status;
-import org.apache.sqoop.validation.Validation;
-import org.apache.sqoop.validation.Validator;
-
-public class FrameworkValidator extends Validator {
-  @Override
-  public Validation validateConnection(Object connectionConfiguration) {
-    Validation validation = new Validation(ConnectionConfiguration.class);
-    // No validation on connection object
-    return validation;
-  }
-
-  @Override
-  public Validation validateJob(Object jobConfiguration) {
-    Validation validation = new Validation(JobConfiguration.class);
-    JobConfiguration conf = (JobConfiguration)jobConfiguration;
-    validateThrottlingForm(validation,conf.throttling);
-
-    return validation;
-  };
-
-  private void validateThrottlingForm(Validation validation, ThrottlingForm throttling) {
-    if(throttling.extractors != null && throttling.extractors < 1) {
-      validation.addMessage(Status.UNACCEPTABLE, "throttling", "extractors", "You need to specify more than one extractor");
-    }
-
-    if(throttling.loaders != null && throttling.loaders < 1) {
-      validation.addMessage(Status.UNACCEPTABLE, "throttling", "loaders", "You need to specify more than one loader");
-    }
-  }
-
-}