You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ab...@apache.org on 2014/10/10 05:07:04 UTC
[12/50] [abbrv] SQOOP-1497: Sqoop2: Entity Nomenclature Revisited
http://git-wip-us.apache.org/repos/asf/sqoop/blob/049994a0/core/src/main/java/org/apache/sqoop/framework/JobManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/JobManager.java b/core/src/main/java/org/apache/sqoop/framework/JobManager.java
deleted file mode 100644
index 8149d1c..0000000
--- a/core/src/main/java/org/apache/sqoop/framework/JobManager.java
+++ /dev/null
@@ -1,710 +0,0 @@
-
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.framework;
-
-import org.apache.log4j.Logger;
-import org.apache.sqoop.common.Direction;
-import org.apache.sqoop.common.MapContext;
-import org.apache.sqoop.common.SqoopException;
-import org.apache.sqoop.connector.ConnectorManager;
-import org.apache.sqoop.framework.configuration.JobConfiguration;
-import org.apache.sqoop.request.HttpEventContext;
-import org.apache.sqoop.connector.idf.IntermediateDataFormat;
-import org.apache.sqoop.connector.spi.SqoopConnector;
-import org.apache.sqoop.core.Reconfigurable;
-import org.apache.sqoop.core.SqoopConfiguration;
-import org.apache.sqoop.core.SqoopConfiguration.CoreConfigurationListener;
-import org.apache.sqoop.job.etl.*;
-import org.apache.sqoop.model.FormUtils;
-import org.apache.sqoop.model.MConnection;
-import org.apache.sqoop.model.MJob;
-import org.apache.sqoop.model.MSubmission;
-import org.apache.sqoop.repository.Repository;
-import org.apache.sqoop.repository.RepositoryManager;
-import org.apache.sqoop.schema.Schema;
-import org.apache.sqoop.submission.SubmissionStatus;
-import org.apache.sqoop.submission.counter.Counters;
-import org.apache.sqoop.utils.ClassUtils;
-import org.json.simple.JSONValue;
-
-import java.util.Date;
-import java.util.List;
-
-public class JobManager implements Reconfigurable {
- /**
- * Logger object.
- */
- private static final Logger LOG = Logger.getLogger(JobManager.class);
-
- /**
- * Private instance to singleton of this class.
- */
- private static JobManager instance;
- /**
- * Create default object by default.
- *
- * Every Sqoop server application needs one so this should not be performance
- * issue.
- */
- static {
- instance = new JobManager();
- }
-
- /**
- * Return current instance.
- *
- * @return Current instance
- */
- public static JobManager getInstance() {
- return instance;
- }
-
- /**
- * Allows to set instance in case that it's need.
- *
- * This method should not be normally used as the default instance should be
- * sufficient. One target user use case for this method are unit tests.
- *
- * @param newInstance
- * New instance
- */
- public static void setInstance(JobManager newInstance) {
- instance = newInstance;
- }
-
- /**
- * Default interval for purging old submissions from repository.
- */
- private static final long DEFAULT_PURGE_THRESHOLD = 24 * 60 * 60 * 1000;
-
- /**
- * Default sleep interval for purge thread.
- */
- private static final long DEFAULT_PURGE_SLEEP = 24 * 60 * 60 * 1000;
-
- /**
- * Default interval for update thread.
- */
- private static final long DEFAULT_UPDATE_SLEEP = 60 * 5 * 1000;
-
- /**
- * Configured submission engine instance
- */
- private SubmissionEngine submissionEngine;
-
- /**
- * Configured execution engine instance
- */
- private ExecutionEngine executionEngine;
-
- /**
- * Purge thread that will periodically remove old submissions from repository.
- */
- private PurgeThread purgeThread = null;
-
- /**
- * Update thread that will periodically check status of running submissions.
- */
- private UpdateThread updateThread = null;
-
- /**
- * Synchronization variable between threads.
- */
- private boolean running = true;
-
- /**
- * Specifies how old submissions should be removed from repository.
- */
- private long purgeThreshold;
-
- /**
- * Number of milliseconds for purge thread to sleep.
- */
- private long purgeSleep;
-
- /**
- * Number of milliseconds for update thread to slepp.
- */
- private long updateSleep;
-
- /**
- * Base notification URL.
- *
- * Framework manager will always add job id.
- */
- private String notificationBaseUrl;
-
- /**
- * Set notification base URL.
- *
- * @param url
- * Base URL
- */
- public void setNotificationBaseUrl(String url) {
- LOG.debug("Setting notification base URL to " + url);
- notificationBaseUrl = url;
- }
-
- /**
- * Get base notification url.
- *
- * @return String representation of the URL
- */
- public String getNotificationBaseUrl() {
- return notificationBaseUrl;
- }
-
- public synchronized void destroy() {
- LOG.trace("Begin submission engine manager destroy");
-
- running = false;
-
- try {
- purgeThread.interrupt();
- purgeThread.join();
- } catch (InterruptedException e) {
- // TODO(jarcec): Do I want to wait until it actually finish here?
- LOG.error("Interrupted joining purgeThread");
- }
-
- try {
- updateThread.interrupt();
- updateThread.join();
- } catch (InterruptedException e) {
- // TODO(jarcec): Do I want to wait until it actually finish here?
- LOG.error("Interrupted joining updateThread");
- }
-
- if (submissionEngine != null) {
- submissionEngine.destroy();
- }
-
- if (executionEngine != null) {
- executionEngine.destroy();
- }
- }
-
- public synchronized void initialize() {
- LOG.trace("Begin submission engine manager initialization");
- MapContext context = SqoopConfiguration.getInstance().getContext();
-
- // Let's load configured submission engine
- String submissionEngineClassName =
- context.getString(FrameworkConstants.SYSCFG_SUBMISSION_ENGINE);
-
- submissionEngine = (SubmissionEngine) ClassUtils
- .instantiate(submissionEngineClassName);
- if (submissionEngine == null) {
- throw new SqoopException(FrameworkError.FRAMEWORK_0001,
- submissionEngineClassName);
- }
-
- submissionEngine.initialize(context,
- FrameworkConstants.PREFIX_SUBMISSION_ENGINE_CONFIG);
-
- // Execution engine
- String executionEngineClassName =
- context.getString(FrameworkConstants.SYSCFG_EXECUTION_ENGINE);
-
- executionEngine = (ExecutionEngine) ClassUtils
- .instantiate(executionEngineClassName);
- if (executionEngine == null) {
- throw new SqoopException(FrameworkError.FRAMEWORK_0007,
- executionEngineClassName);
- }
-
- // We need to make sure that user has configured compatible combination of
- // submission engine and execution engine
- if (!submissionEngine
- .isExecutionEngineSupported(executionEngine.getClass())) {
- throw new SqoopException(FrameworkError.FRAMEWORK_0008);
- }
-
- executionEngine.initialize(context,
- FrameworkConstants.PREFIX_EXECUTION_ENGINE_CONFIG);
-
- // Set up worker threads
- purgeThreshold = context.getLong(
- FrameworkConstants.SYSCFG_SUBMISSION_PURGE_THRESHOLD,
- DEFAULT_PURGE_THRESHOLD
- );
- purgeSleep = context.getLong(
- FrameworkConstants.SYSCFG_SUBMISSION_PURGE_SLEEP,
- DEFAULT_PURGE_SLEEP
- );
-
- purgeThread = new PurgeThread();
- purgeThread.start();
-
- updateSleep = context.getLong(
- FrameworkConstants.SYSCFG_SUBMISSION_UPDATE_SLEEP,
- DEFAULT_UPDATE_SLEEP
- );
-
- updateThread = new UpdateThread();
- updateThread.start();
-
- SqoopConfiguration.getInstance().getProvider()
- .registerListener(new CoreConfigurationListener(this));
-
- LOG.info("Submission manager initialized: OK");
- }
-
- public MSubmission submit(long jobId, HttpEventContext ctx) {
-
- MSubmission mSubmission = createJobSubmission(ctx, jobId);
- JobRequest jobRequest = createJobRequest(jobId, mSubmission);
- // Bootstrap job to execute
- prepareJob(jobRequest);
- // Make sure that this job id is not currently running and submit the job
- // only if it's not.
- synchronized (getClass()) {
- MSubmission lastSubmission = RepositoryManager.getInstance().getRepository()
- .findSubmissionLastForJob(jobId);
- if (lastSubmission != null && lastSubmission.getStatus().isRunning()) {
- throw new SqoopException(FrameworkError.FRAMEWORK_0002, "Job with id " + jobId);
- }
- // TODO(Abe): Call multiple destroyers.
- // TODO(jarcec): We might need to catch all exceptions here to ensure
- // that Destroyer will be executed in all cases.
- // NOTE: the following is a blocking call
- boolean success = submissionEngine.submit(jobRequest);
- if (!success) {
- destroySubmission(jobRequest);
- mSubmission.setStatus(SubmissionStatus.FAILURE_ON_SUBMIT);
- }
- RepositoryManager.getInstance().getRepository().createSubmission(mSubmission);
- }
- return mSubmission;
- }
-
- private JobRequest createJobRequest(long jobId, MSubmission submission) {
- // get job
- MJob job = getJob(jobId);
-
- // get from/to connections for the job
- MConnection fromConnection = getConnection(job.getConnectionId(Direction.FROM));
- MConnection toConnection = getConnection(job.getConnectionId(Direction.TO));
-
- // get from/to connectors for the connection
- SqoopConnector fromConnector = getConnector(fromConnection.getConnectorId());
- validateSupportedDirection(fromConnector, Direction.FROM);
- SqoopConnector toConnector = getConnector(toConnection.getConnectorId());
- validateSupportedDirection(toConnector, Direction.TO);
-
- // Transform config to fromConnector specific classes
- Object fromConnectionConfig = ClassUtils.instantiate(fromConnector
- .getConnectionConfigurationClass());
- FormUtils.fromForms(fromConnection.getConnectorPart().getForms(), fromConnectionConfig);
-
- // Transform config to toConnector specific classes
- Object toConnectorConfig = ClassUtils
- .instantiate(toConnector.getConnectionConfigurationClass());
- FormUtils.fromForms(toConnection.getConnectorPart().getForms(), toConnectorConfig);
-
- Object fromJob = ClassUtils.instantiate(fromConnector.getJobConfigurationClass(Direction.FROM));
- FormUtils.fromForms(job.getConnectorPart(Direction.FROM).getForms(), fromJob);
-
- Object toJob = ClassUtils.instantiate(toConnector.getJobConfigurationClass(Direction.TO));
- FormUtils.fromForms(job.getConnectorPart(Direction.TO).getForms(), toJob);
-
- // Transform framework specific configs
- // Q(VB) : Aren't the following 2 exactly the same?
- Object fromFrameworkConnection = ClassUtils.instantiate(FrameworkManager.getInstance()
- .getConnectionConfigurationClass());
- FormUtils.fromForms(fromConnection.getFrameworkPart().getForms(), fromFrameworkConnection);
-
- Object toFrameworkConnection = ClassUtils.instantiate(FrameworkManager.getInstance()
- .getConnectionConfigurationClass());
- FormUtils.fromForms(toConnection.getFrameworkPart().getForms(), toFrameworkConnection);
-
- Object frameworkJob = ClassUtils.instantiate(FrameworkManager.getInstance()
- .getJobConfigurationClass());
- FormUtils.fromForms(job.getFrameworkPart().getForms(), frameworkJob);
-
- // Create a job request for submit/execution
- JobRequest jobRequest = executionEngine.createJobRequest();
- // Save important variables to the job request
- jobRequest.setSummary(submission);
- jobRequest.setConnector(Direction.FROM, fromConnector);
- jobRequest.setConnector(Direction.TO, toConnector);
- jobRequest.setConnectorConnectionConfig(Direction.FROM, fromConnectionConfig);
- jobRequest.setConnectorConnectionConfig(Direction.TO, toConnectorConfig);
- jobRequest.setConnectorJobConfig(Direction.FROM, fromJob);
- jobRequest.setConnectorJobConfig(Direction.TO, toJob);
- // TODO(Abe): Should we actually have 2 different Framework Connection config objects?
- jobRequest.setFrameworkConnectionConfig(Direction.FROM, fromFrameworkConnection);
- jobRequest.setFrameworkConnectionConfig(Direction.TO, toFrameworkConnection);
- jobRequest.setConfigFrameworkJob(frameworkJob);
- jobRequest.setJobName(job.getName());
- jobRequest.setJobId(job.getPersistenceId());
- jobRequest.setNotificationUrl(notificationBaseUrl + jobId);
- Class<? extends IntermediateDataFormat<?>> dataFormatClass =
- fromConnector.getIntermediateDataFormat();
- jobRequest.setIntermediateDataFormat(fromConnector.getIntermediateDataFormat());
-
-
- jobRequest.setFrom(fromConnector.getFrom());
- jobRequest.setTo(toConnector.getTo());
-
- addStandardJars(jobRequest);
- addConnectorJars(jobRequest, fromConnector, toConnector, dataFormatClass);
- addConnectorInitializerJars(jobRequest, Direction.FROM);
- addConnectorInitializerJars(jobRequest, Direction.TO);
-
- Schema fromSchema = getSchemaFromConnector(jobRequest, Direction.FROM);
- Schema toSchema = getSchemaFromConnector(jobRequest, Direction.TO);
-
- // TODO(Gwen): Need better logic here once the Schema refactor: SQOOP-1378
- if (fromSchema != null) {
- jobRequest.getSummary().setFromSchema(fromSchema);
- }
- else {
- jobRequest.getSummary().setFromSchema(toSchema);
- }
- LOG.debug("Using entities: " + jobRequest.getFrom() + ", " + jobRequest.getTo());
- return jobRequest;
- }
-
- private void addConnectorJars(JobRequest jobRequest, SqoopConnector fromConnector,
- SqoopConnector toConnector, Class<? extends IntermediateDataFormat<?>> dataFormatClass) {
- jobRequest.addJarForClass(fromConnector.getClass());
- jobRequest.addJarForClass(toConnector.getClass());
- jobRequest.addJarForClass(dataFormatClass);
- }
-
- private void addStandardJars(JobRequest jobRequest) {
- // Let's register all important jars
- // sqoop-common
- jobRequest.addJarForClass(MapContext.class);
- // sqoop-core
- jobRequest.addJarForClass(FrameworkManager.class);
- // sqoop-spi
- jobRequest.addJarForClass(SqoopConnector.class);
- // Execution engine jar
- jobRequest.addJarForClass(executionEngine.getClass());
- // Extra libraries that Sqoop code requires
- jobRequest.addJarForClass(JSONValue.class);
- }
-
- MSubmission createJobSubmission(HttpEventContext ctx, long jobId) {
- MSubmission summary = new MSubmission(jobId);
- summary.setCreationUser(ctx.getUsername());
- summary.setLastUpdateUser(ctx.getUsername());
- return summary;
- }
-
- SqoopConnector getConnector(long connnectorId) {
- return ConnectorManager.getInstance().getConnector(connnectorId);
- }
-
- void validateSupportedDirection(SqoopConnector connector, Direction direction) {
- // Make sure that connector supports the given direction
- if (!connector.getSupportedDirections().contains(direction)) {
- throw new SqoopException(FrameworkError.FRAMEWORK_0011, "Connector: "
- + connector.getClass().getCanonicalName());
- }
- }
-
- MConnection getConnection(long connectionId) {
- MConnection connection = RepositoryManager.getInstance().getRepository()
- .findConnection(connectionId);
- if (!connection.getEnabled()) {
- throw new SqoopException(FrameworkError.FRAMEWORK_0010, "Connection id: "
- + connection.getPersistenceId());
- }
- return connection;
- }
-
- MJob getJob(long jobId) {
- MJob job = RepositoryManager.getInstance().getRepository().findJob(jobId);
- if (job == null) {
- throw new SqoopException(FrameworkError.FRAMEWORK_0004, "Unknown job id: " + jobId);
- }
-
- if (!job.getEnabled()) {
- throw new SqoopException(FrameworkError.FRAMEWORK_0009, "Job id: " + job.getPersistenceId());
- }
- return job;
- }
-
- private Schema getSchemaFromConnector(JobRequest jobRequest, Direction direction) {
-
- Initializer initializer = getConnectorInitializer(jobRequest, direction);
-
- // Initializer context
- InitializerContext initializerContext = getInitializerContext(jobRequest, direction);
-
- // Initialize submission from the connector perspective
- initializer.initialize(initializerContext, jobRequest.getConnectorConnectionConfig(direction),
- jobRequest.getConnectorJobConfig(direction));
-
- // TODO(Abe): Alter behavior of Schema here.
- return initializer.getSchema(initializerContext,
- jobRequest.getConnectorConnectionConfig(direction),
- jobRequest.getConnectorJobConfig(direction));
- }
-
- private void addConnectorInitializerJars(JobRequest jobRequest, Direction direction) {
-
- Initializer initializer = getConnectorInitializer(jobRequest, direction);
- InitializerContext initializerContext = getInitializerContext(jobRequest, direction);
- // Add job specific jars to
- jobRequest.addJars(initializer.getJars(initializerContext,
- jobRequest.getConnectorConnectionConfig(direction),
- jobRequest.getConnectorJobConfig(direction)));
- }
-
- private Initializer getConnectorInitializer(JobRequest jobRequest, Direction direction) {
- Transferable transferable = direction.equals(Direction.FROM) ? jobRequest.getFrom() : jobRequest.getTo();
- Class<? extends Initializer> initializerClass = transferable.getInitializer();
- Initializer initializer = (Initializer) ClassUtils.instantiate(initializerClass);
-
- if (initializer == null) {
- throw new SqoopException(FrameworkError.FRAMEWORK_0006,
- "Can't create connector initializer instance: " + initializerClass.getName());
- }
- return initializer;
- }
-
- private InitializerContext getInitializerContext(JobRequest jobRequest, Direction direction) {
- return new InitializerContext(jobRequest.getConnectorContext(direction));
- }
-
- void prepareJob(JobRequest request) {
- JobConfiguration jobConfiguration = (JobConfiguration) request.getConfigFrameworkJob();
- // We're directly moving configured number of extractors and loaders to
- // underlying request object. In the future we might need to throttle this
- // count based on other running jobs to meet our SLAs.
- request.setExtractors(jobConfiguration.throttling.extractors);
- request.setLoaders(jobConfiguration.throttling.loaders);
-
- // Delegate rest of the job to execution engine
- executionEngine.prepareJob(request);
- }
-
- /**
- * Callback that will be called only if we failed to submit the job to the
- * remote cluster.
- */
- void destroySubmission(JobRequest request) {
- Transferable from = request.getFrom();
- Transferable to = request.getTo();
-
- Class<? extends Destroyer> fromDestroyerClass = from.getDestroyer();
- Class<? extends Destroyer> toDestroyerClass = to.getDestroyer();
- Destroyer fromDestroyer = (Destroyer) ClassUtils.instantiate(fromDestroyerClass);
- Destroyer toDestroyer = (Destroyer) ClassUtils.instantiate(toDestroyerClass);
-
- if (fromDestroyer == null) {
- throw new SqoopException(FrameworkError.FRAMEWORK_0006,
- "Can't create toDestroyer instance: " + fromDestroyerClass.getName());
- }
-
- if (toDestroyer == null) {
- throw new SqoopException(FrameworkError.FRAMEWORK_0006,
- "Can't create toDestroyer instance: " + toDestroyerClass.getName());
- }
-
- // TODO(Abe): Update context to manage multiple connectors. As well as summary.
- DestroyerContext fromDestroyerContext = new DestroyerContext(
- request.getConnectorContext(Direction.FROM), false, request.getSummary()
- .getFromSchema());
- DestroyerContext toDestroyerContext = new DestroyerContext(
- request.getConnectorContext(Direction.TO), false, request.getSummary()
- .getToSchema());
-
- // destroy submission from connector perspective
- fromDestroyer.destroy(fromDestroyerContext, request.getConnectorConnectionConfig(Direction.FROM),
- request.getConnectorJobConfig(Direction.FROM));
- toDestroyer.destroy(toDestroyerContext, request.getConnectorConnectionConfig(Direction.TO),
- request.getConnectorJobConfig(Direction.TO));
- }
-
- public MSubmission stop(long jobId, HttpEventContext ctx) {
-
- Repository repository = RepositoryManager.getInstance().getRepository();
- MSubmission mSubmission = repository.findSubmissionLastForJob(jobId);
-
- if (mSubmission == null || !mSubmission.getStatus().isRunning()) {
- throw new SqoopException(FrameworkError.FRAMEWORK_0003, "Job with id " + jobId
- + " is not running");
- }
- submissionEngine.stop(mSubmission.getExternalId());
-
- mSubmission.setLastUpdateUser(ctx.getUsername());
-
- // Fetch new information to verify that the stop command has actually worked
- update(mSubmission);
-
- // Return updated structure
- return mSubmission;
- }
-
- public MSubmission status(long jobId) {
- Repository repository = RepositoryManager.getInstance().getRepository();
- MSubmission mSubmission = repository.findSubmissionLastForJob(jobId);
-
- if (mSubmission == null) {
- return new MSubmission(jobId, new Date(), SubmissionStatus.NEVER_EXECUTED);
- }
-
- // If the submission is in running state, let's update it
- if (mSubmission.getStatus().isRunning()) {
- update(mSubmission);
- }
-
- return mSubmission;
- }
-
- private void update(MSubmission submission) {
- double progress = -1;
- Counters counters = null;
- String externalId = submission.getExternalId();
- SubmissionStatus newStatus = submissionEngine.status(externalId);
- String externalLink = submissionEngine.externalLink(externalId);
-
- if (newStatus.isRunning()) {
- progress = submissionEngine.progress(externalId);
- } else {
- counters = submissionEngine.counters(externalId);
- }
-
- submission.setStatus(newStatus);
- submission.setProgress(progress);
- submission.setCounters(counters);
- submission.setExternalLink(externalLink);
- submission.setLastUpdateDate(new Date());
-
- RepositoryManager.getInstance().getRepository()
- .updateSubmission(submission);
- }
-
- @Override
- public synchronized void configurationChanged() {
- LOG.info("Begin submission engine manager reconfiguring");
- MapContext newContext = SqoopConfiguration.getInstance().getContext();
- MapContext oldContext = SqoopConfiguration.getInstance().getOldContext();
-
- String newSubmissionEngineClassName = newContext
- .getString(FrameworkConstants.SYSCFG_SUBMISSION_ENGINE);
- if (newSubmissionEngineClassName == null
- || newSubmissionEngineClassName.trim().length() == 0) {
- throw new SqoopException(FrameworkError.FRAMEWORK_0001,
- newSubmissionEngineClassName);
- }
-
- String oldSubmissionEngineClassName = oldContext
- .getString(FrameworkConstants.SYSCFG_SUBMISSION_ENGINE);
- if (!newSubmissionEngineClassName.equals(oldSubmissionEngineClassName)) {
- LOG.warn("Submission engine cannot be replaced at the runtime. " +
- "You might need to restart the server.");
- }
-
- String newExecutionEngineClassName = newContext
- .getString(FrameworkConstants.SYSCFG_EXECUTION_ENGINE);
- if (newExecutionEngineClassName == null
- || newExecutionEngineClassName.trim().length() == 0) {
- throw new SqoopException(FrameworkError.FRAMEWORK_0007,
- newExecutionEngineClassName);
- }
-
- String oldExecutionEngineClassName = oldContext
- .getString(FrameworkConstants.SYSCFG_EXECUTION_ENGINE);
- if (!newExecutionEngineClassName.equals(oldExecutionEngineClassName)) {
- LOG.warn("Execution engine cannot be replaced at the runtime. " +
- "You might need to restart the server.");
- }
-
- // Set up worker threads
- purgeThreshold = newContext.getLong(
- FrameworkConstants.SYSCFG_SUBMISSION_PURGE_THRESHOLD,
- DEFAULT_PURGE_THRESHOLD
- );
- purgeSleep = newContext.getLong(
- FrameworkConstants.SYSCFG_SUBMISSION_PURGE_SLEEP,
- DEFAULT_PURGE_SLEEP
- );
- purgeThread.interrupt();
-
- updateSleep = newContext.getLong(
- FrameworkConstants.SYSCFG_SUBMISSION_UPDATE_SLEEP,
- DEFAULT_UPDATE_SLEEP
- );
- updateThread.interrupt();
-
- LOG.info("Submission engine manager reconfigured.");
- }
-
- private class PurgeThread extends Thread {
- public PurgeThread() {
- super("PurgeThread");
- }
-
- public void run() {
- LOG.info("Starting submission manager purge thread");
-
- while (running) {
- try {
- LOG.info("Purging old submissions");
- Date threshold = new Date((new Date()).getTime() - purgeThreshold);
- RepositoryManager.getInstance().getRepository()
- .purgeSubmissions(threshold);
- Thread.sleep(purgeSleep);
- } catch (InterruptedException e) {
- LOG.debug("Purge thread interrupted", e);
- }
- }
-
- LOG.info("Ending submission manager purge thread");
- }
- }
-
- private class UpdateThread extends Thread {
- public UpdateThread() {
- super("UpdateThread");
- }
-
- public void run() {
- LOG.info("Starting submission manager update thread");
-
- while (running) {
- try {
- LOG.debug("Updating running submissions");
-
- // Let's get all running submissions from repository to check them out
- List<MSubmission> unfinishedSubmissions =
- RepositoryManager.getInstance().getRepository()
- .findSubmissionsUnfinished();
-
- for (MSubmission submission : unfinishedSubmissions) {
- update(submission);
- }
-
- Thread.sleep(updateSleep);
- } catch (InterruptedException e) {
- LOG.debug("Purge thread interrupted", e);
- }
- }
-
- LOG.info("Ending submission manager update thread");
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/sqoop/blob/049994a0/core/src/main/java/org/apache/sqoop/framework/JobRequest.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/JobRequest.java b/core/src/main/java/org/apache/sqoop/framework/JobRequest.java
deleted file mode 100644
index 1f77693..0000000
--- a/core/src/main/java/org/apache/sqoop/framework/JobRequest.java
+++ /dev/null
@@ -1,356 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.framework;
-
-import org.apache.sqoop.common.Direction;
-import org.apache.sqoop.common.DirectionError;
-import org.apache.sqoop.common.MutableMapContext;
-import org.apache.sqoop.common.SqoopException;
-import org.apache.sqoop.connector.idf.IntermediateDataFormat;
-import org.apache.sqoop.connector.spi.SqoopConnector;
-import org.apache.sqoop.job.etl.Transferable;
-import org.apache.sqoop.model.MSubmission;
-import org.apache.sqoop.utils.ClassUtils;
-
-import java.util.LinkedList;
-import java.util.List;
-
-/**
- * Submission details class is used when creating new submission and contains
- * all information that we need to create a new submission (including mappers,
- * reducers, ...).
- */
-public class JobRequest {
-
- /**
- * Submission summary
- */
- MSubmission summary;
-
- /**
- * Original job name
- */
- String jobName;
-
- /**
- * Associated job (from metadata perspective) id
- */
- long jobId;
-
- /**
- * Connector instances associated with this submission request
- */
- SqoopConnector fromConnector;
- SqoopConnector toConnector;
-
- /**
- * List of required local jars for the job
- */
- List<String> jars;
-
- /**
- * From entity
- */
- Transferable from;
-
- /**
- * To entity
- */
- Transferable to;
-
- /**
- * All configuration objects
- */
- Object fromConnectorConnectionConfig;
- Object toConnectorConnectionConfig;
- Object fromConnectorJobConfig;
- Object toConnectorJobConfig;
- Object fromFrameworkConnectionConfig;
- Object toFrameworkConnectionConfig;
- Object configFrameworkJob;
-
- /**
- * Connector context (submission specific configuration)
- */
- MutableMapContext fromConnectorContext;
- MutableMapContext toConnectorContext;
-
- /**
- * Framework context (submission specific configuration)
- */
- MutableMapContext frameworkContext;
-
- /**
- * Optional notification URL for job progress
- */
- String notificationUrl;
-
- /**
- * Number of extractors
- */
- Integer extractors;
-
- /**
- * Number of loaders
- */
- Integer loaders;
-
- /**
- * The intermediate data format this submission should use.
- */
- Class<? extends IntermediateDataFormat> intermediateDataFormat;
-
- public JobRequest() {
- this.jars = new LinkedList<String>();
- this.fromConnectorContext = new MutableMapContext();
- this.toConnectorContext = new MutableMapContext();
- this.frameworkContext = new MutableMapContext();
- this.fromConnector = null;
- this.toConnector = null;
- this.fromConnectorConnectionConfig = null;
- this.toConnectorConnectionConfig = null;
- this.fromConnectorJobConfig = null;
- this.toConnectorJobConfig = null;
- this.fromFrameworkConnectionConfig = null;
- this.toFrameworkConnectionConfig = null;
- }
-
- public MSubmission getSummary() {
- return summary;
- }
-
- public void setSummary(MSubmission summary) {
- this.summary = summary;
- }
-
- public String getJobName() {
- return jobName;
- }
-
- public void setJobName(String jobName) {
- this.jobName = jobName;
- }
-
- public long getJobId() {
- return jobId;
- }
-
- public void setJobId(long jobId) {
- this.jobId = jobId;
- }
-
- public SqoopConnector getConnector(Direction type) {
- switch(type) {
- case FROM:
- return fromConnector;
-
- case TO:
- return toConnector;
-
- default:
- throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type);
- }
- }
-
- public void setConnector(Direction type, SqoopConnector connector) {
- switch(type) {
- case FROM:
- fromConnector = connector;
- break;
-
- case TO:
- toConnector = connector;
- break;
-
- default:
- throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type);
- }
- }
-
- public List<String> getJars() {
- return jars;
- }
-
- public void addJar(String jar) {
- if(!jars.contains(jar)) {
- jars.add(jar);
- }
- }
-
- public void addJarForClass(Class klass) {
- addJar(ClassUtils.jarForClass(klass));
- }
-
- public void addJars(List<String> jars) {
- for(String j : jars) {
- addJar(j);
- }
- }
-
- public Transferable getFrom() {
- return from;
- }
-
- public void setFrom(Transferable from) {
- this.from = from;
- }
-
- public Transferable getTo() {
- return to;
- }
-
- public void setTo(Transferable to) {
- this.to = to;
- }
-
- public Object getConnectorConnectionConfig(Direction type) {
- switch(type) {
- case FROM:
- return fromConnectorConnectionConfig;
-
- case TO:
- return toConnectorConnectionConfig;
-
- default:
- throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type);
- }
- }
-
- public void setConnectorConnectionConfig(Direction type, Object config) {
- switch(type) {
- case FROM:
- fromConnectorConnectionConfig = config;
- break;
- case TO:
- toConnectorConnectionConfig = config;
- break;
- default:
- throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type);
- }
- }
-
- public Object getConnectorJobConfig(Direction type) {
- switch(type) {
- case FROM:
- return fromConnectorJobConfig;
-
- case TO:
- return toConnectorJobConfig;
-
- default:
- throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type);
- }
- }
-
- public void setConnectorJobConfig(Direction type, Object config) {
- switch(type) {
- case FROM:
- fromConnectorJobConfig = config;
- break;
- case TO:
- toConnectorJobConfig = config;
- break;
- default:
- throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type);
- }
- }
-
- public Object getFrameworkConnectionConfig(Direction type) {
- switch(type) {
- case FROM:
- return fromFrameworkConnectionConfig;
-
- case TO:
- return toFrameworkConnectionConfig;
-
- default:
- throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type);
- }
- }
-
- public void setFrameworkConnectionConfig(Direction type, Object config) {
- switch(type) {
- case FROM:
- fromFrameworkConnectionConfig = config;
- break;
- case TO:
- toFrameworkConnectionConfig = config;
- break;
- default:
- throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type);
- }
- }
-
- public Object getConfigFrameworkJob() {
- return configFrameworkJob;
- }
-
- public void setConfigFrameworkJob(Object config) {
- configFrameworkJob = config;
- }
-
- public MutableMapContext getConnectorContext(Direction type) {
- switch(type) {
- case FROM:
- return fromConnectorContext;
-
- case TO:
- return toConnectorContext;
-
- default:
- throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type);
- }
- }
-
- public MutableMapContext getFrameworkContext() {
- return frameworkContext;
- }
-
- public String getNotificationUrl() {
- return notificationUrl;
- }
-
- public void setNotificationUrl(String url) {
- this.notificationUrl = url;
- }
-
- public Integer getExtractors() {
- return extractors;
- }
-
- public void setExtractors(Integer extractors) {
- this.extractors = extractors;
- }
-
- public Integer getLoaders() {
- return loaders;
- }
-
- public void setLoaders(Integer loaders) {
- this.loaders = loaders;
- }
-
- public Class<? extends IntermediateDataFormat> getIntermediateDataFormat() {
- return intermediateDataFormat;
- }
-
- public void setIntermediateDataFormat(Class<? extends IntermediateDataFormat> intermediateDataFormat) {
- this.intermediateDataFormat = intermediateDataFormat;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/049994a0/core/src/main/java/org/apache/sqoop/framework/SubmissionEngine.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/SubmissionEngine.java b/core/src/main/java/org/apache/sqoop/framework/SubmissionEngine.java
deleted file mode 100644
index 732be3b..0000000
--- a/core/src/main/java/org/apache/sqoop/framework/SubmissionEngine.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.framework;
-
-import org.apache.sqoop.common.MapContext;
-import org.apache.sqoop.submission.counter.Counters;
-import org.apache.sqoop.submission.SubmissionStatus;
-
-/**
- * Submission engine is responsible in conveying the information about the
- * job instances (submissions) to remote (hadoop) cluster.
- */
-public abstract class SubmissionEngine {
-
- /**
- * Initialize submission engine
- *
- * @param context Configuration context
- * @param prefix Submission engine prefix
- */
- public void initialize(MapContext context, String prefix) {
- }
-
- /**
- * Destroy submission engine when stopping server
- */
- public void destroy() {
- }
-
- /**
- * Callback to verify that configured submission engine and execution engine
- * are compatible.
- *
- * @param executionEngineClass Configured execution class.
- * @return True if such execution engine is supported
- */
- public abstract boolean isExecutionEngineSupported(Class executionEngineClass);
-
- /**
- * Submit new job to remote (hadoop) cluster. This method *must* fill
- * submission.getSummary.setExternalId(), otherwise Sqoop framework won't
- * be able to track progress on this job!
- *
- * @return Return true if we were able to submit job to remote cluster.
- */
- public abstract boolean submit(JobRequest submission);
-
- /**
- * Hard stop for given submission.
- *
- * @param submissionId Submission internal id.
- */
- public abstract void stop(String submissionId);
-
- /**
- * Return status of given submission.
- *
- * @param submissionId Submission internal id.
- * @return Current submission status.
- */
- public abstract SubmissionStatus status(String submissionId);
-
- /**
- * Return submission progress.
- *
- * Expected is number from interval <0, 1> denoting how far the processing
- * has gone or -1 in case that this submission engine do not supports
- * progress reporting.
- *
- * @param submissionId Submission internal id.
- * @return {-1} union <0, 1>
- */
- public double progress(String submissionId) {
- return -1;
- }
-
- /**
- * Return statistics for given submission id.
- *
- * Sqoop framework will call counters only for submission in state SUCCEEDED,
- * it's consider exceptional state to call this method for other states.
- *
- * @param submissionId Submission internal id.
- * @return Submission statistics
- */
- public Counters counters(String submissionId) {
- return null;
- }
-
- /**
- * Return link to external web page with given submission.
- *
- * @param submissionId Submission internal id.
- * @return Null in case that external page is not supported or available or
- * HTTP link to given submission.
- */
- public String externalLink(String submissionId) {
- return null;
- }
-}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/049994a0/core/src/main/java/org/apache/sqoop/framework/configuration/ConnectionConfiguration.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/configuration/ConnectionConfiguration.java b/core/src/main/java/org/apache/sqoop/framework/configuration/ConnectionConfiguration.java
deleted file mode 100644
index 897d3c7..0000000
--- a/core/src/main/java/org/apache/sqoop/framework/configuration/ConnectionConfiguration.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.framework.configuration;
-
-import org.apache.sqoop.model.ConfigurationClass;
-import org.apache.sqoop.model.Form;
-
-/**
- * Framework class representing connection configuration
- */
-@ConfigurationClass
-public class ConnectionConfiguration {
-
- @Form SecurityForm security = new SecurityForm();
-}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/049994a0/core/src/main/java/org/apache/sqoop/framework/configuration/JobConfiguration.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/configuration/JobConfiguration.java b/core/src/main/java/org/apache/sqoop/framework/configuration/JobConfiguration.java
deleted file mode 100644
index 0abc611..0000000
--- a/core/src/main/java/org/apache/sqoop/framework/configuration/JobConfiguration.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.framework.configuration;
-
-import org.apache.sqoop.model.ConfigurationClass;
-import org.apache.sqoop.model.Form;
-
-@ConfigurationClass
-public class JobConfiguration {
- @Form
- public ThrottlingForm throttling;
-
- public JobConfiguration() {
- throttling = new ThrottlingForm();
- }
-}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/049994a0/core/src/main/java/org/apache/sqoop/framework/configuration/SecurityForm.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/configuration/SecurityForm.java b/core/src/main/java/org/apache/sqoop/framework/configuration/SecurityForm.java
deleted file mode 100644
index 8ab50ed..0000000
--- a/core/src/main/java/org/apache/sqoop/framework/configuration/SecurityForm.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.framework.configuration;
-
-import org.apache.sqoop.model.FormClass;
-import org.apache.sqoop.model.Input;
-
-/**
- * Security form
- */
-@FormClass
-public class SecurityForm {
- @Input public Integer maxConnections;
-}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/049994a0/core/src/main/java/org/apache/sqoop/framework/configuration/ThrottlingForm.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/configuration/ThrottlingForm.java b/core/src/main/java/org/apache/sqoop/framework/configuration/ThrottlingForm.java
deleted file mode 100644
index c435f6b..0000000
--- a/core/src/main/java/org/apache/sqoop/framework/configuration/ThrottlingForm.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.framework.configuration;
-
-import org.apache.sqoop.model.FormClass;
-import org.apache.sqoop.model.Input;
-
-/**
- * Form to set up number of loaders and extractors
- */
-@FormClass
-public class ThrottlingForm {
-
- @Input public Integer extractors;
-
- @Input public Integer loaders;
-}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/049994a0/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java b/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java
index fa119a5..3466116 100644
--- a/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java
+++ b/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java
@@ -23,9 +23,9 @@ import java.util.List;
import org.apache.log4j.Logger;
import org.apache.sqoop.common.SqoopException;
-import org.apache.sqoop.model.MConnection;
+import org.apache.sqoop.model.MLink;
import org.apache.sqoop.model.MConnector;
-import org.apache.sqoop.model.MFramework;
+import org.apache.sqoop.model.MDriverConfig;
import org.apache.sqoop.model.MJob;
import org.apache.sqoop.model.MSubmission;
@@ -43,13 +43,13 @@ public class JdbcRepository extends Repository {
}
/**
- * Private interface to wrap specific code that requires fresh connection to
- * repository with general code that will get the connection and handle
+ * Private interface to wrap specific code that requires fresh link to
+ * repository with general code that will get the link and handle
* exceptions.
*/
private interface DoWithConnection {
/**
- * Do what is needed to be done with given connection object.
+ * Do what is needed to be done with given link object.
*
* @param conn Connection to metadata repository.
* @return Arbitrary value
@@ -62,7 +62,7 @@ public class JdbcRepository extends Repository {
}
/**
- * Handle transaction and connection functionality and delegate action to
+ * Handle transaction and link functionality and delegate action to
* given delegator.
*
* @param delegator Code for specific action
@@ -77,7 +77,7 @@ public class JdbcRepository extends Repository {
boolean shouldCloseTxn = false;
try {
- // Get transaction and connection
+ // Get transaction and link
Connection conn;
if (tx == null) {
tx = getTransaction();
@@ -205,6 +205,7 @@ public class JdbcRepository extends Repository {
/**
* {@inheritDoc}
*/
+ @SuppressWarnings("unchecked")
@Override
public List<MConnector> findConnectors() {
return (List<MConnector>) doWithConnection(new DoWithConnection() {
@@ -219,24 +220,24 @@ public class JdbcRepository extends Repository {
* {@inheritDoc}
*/
@Override
- public MFramework registerFramework(final MFramework mFramework, final boolean autoUpgrade) {
- return (MFramework) doWithConnection(new DoWithConnection() {
+ public MDriverConfig registerDriverConfig(final MDriverConfig mDriverConfig, final boolean autoUpgrade) {
+ return (MDriverConfig) doWithConnection(new DoWithConnection() {
@Override
public Object doIt(Connection conn) {
- MFramework result = handler.findFramework(conn);
+ MDriverConfig result = handler.findDriverConfig(conn);
if (result == null) {
- handler.registerFramework(mFramework, conn);
- return mFramework;
+ handler.registerDriverConfig(mDriverConfig, conn);
+ return mDriverConfig;
} else {
- // We're currently not serializing framework version into repository
+ // We're currently not serializing version into repository
// so let's just compare the structure to see if we need upgrade.
- if(!mFramework.equals(result)) {
+ if(!mDriverConfig.equals(result)) {
if (autoUpgrade) {
- upgradeFramework(mFramework);
- return mFramework;
+ upgradeDriverConfig(mDriverConfig);
+ return mDriverConfig;
} else {
throw new SqoopException(RepositoryError.JDBCREPO_0026,
- "Framework: " + mFramework.getPersistenceId());
+ "DriverConfig: " + mDriverConfig.getPersistenceId());
}
}
return result;
@@ -249,15 +250,15 @@ public class JdbcRepository extends Repository {
* {@inheritDoc}
*/
@Override
- public void createConnection(final MConnection connection) {
+ public void createLink(final MLink link) {
doWithConnection(new DoWithConnection() {
@Override
public Object doIt(Connection conn) {
- if(connection.hasPersistenceId()) {
+ if(link.hasPersistenceId()) {
throw new SqoopException(RepositoryError.JDBCREPO_0015);
}
- handler.createConnection(connection, conn);
+ handler.createLink(link, conn);
return null;
}
});
@@ -267,28 +268,27 @@ public class JdbcRepository extends Repository {
* {@inheritDoc}
*/
@Override
- public void updateConnection(final MConnection connection) {
- updateConnection(connection, null);
+ public void updateLink(final MLink link) {
+ updateLink(link, null);
}
/**
* {@inheritDoc}
*/
@Override
- public void updateConnection(final MConnection connection,
- RepositoryTransaction tx) {
+ public void updateLink(final MLink link, RepositoryTransaction tx) {
doWithConnection(new DoWithConnection() {
@Override
public Object doIt(Connection conn) {
- if(!connection.hasPersistenceId()) {
+ if (!link.hasPersistenceId()) {
throw new SqoopException(RepositoryError.JDBCREPO_0016);
}
- if(!handler.existsConnection(connection.getPersistenceId(), conn)) {
- throw new SqoopException(RepositoryError.JDBCREPO_0017,
- "Invalid id: " + connection.getPersistenceId());
+ if (!handler.existsLink(link.getPersistenceId(), conn)) {
+ throw new SqoopException(RepositoryError.JDBCREPO_0017, "Invalid id: "
+ + link.getPersistenceId());
}
- handler.updateConnection(connection, conn);
+ handler.updateLink(link, conn);
return null;
}
}, (JdbcRepositoryTransaction) tx);
@@ -298,16 +298,16 @@ public class JdbcRepository extends Repository {
* {@inheritDoc}
*/
@Override
- public void enableConnection(final long connectionId, final boolean enabled) {
+ public void enableLink(final long linkId, final boolean enabled) {
doWithConnection(new DoWithConnection() {
@Override
public Object doIt(Connection conn) {
- if(!handler.existsConnection(connectionId, conn)) {
+ if(!handler.existsLink(linkId, conn)) {
throw new SqoopException(RepositoryError.JDBCREPO_0017,
- "Invalid id: " + connectionId);
+ "Invalid id: " + linkId);
}
- handler.enableConnection(connectionId, enabled, conn);
+ handler.enableLink(linkId, enabled, conn);
return null;
}
});
@@ -317,20 +317,20 @@ public class JdbcRepository extends Repository {
* {@inheritDoc}
*/
@Override
- public void deleteConnection(final long connectionId) {
+ public void deleteLink(final long linkId) {
doWithConnection(new DoWithConnection() {
@Override
public Object doIt(Connection conn) {
- if(!handler.existsConnection(connectionId, conn)) {
+ if(!handler.existsLink(linkId, conn)) {
throw new SqoopException(RepositoryError.JDBCREPO_0017,
- "Invalid id: " + connectionId);
+ "Invalid id: " + linkId);
}
- if(handler.inUseConnection(connectionId, conn)) {
+ if(handler.inUseLink(linkId, conn)) {
throw new SqoopException(RepositoryError.JDBCREPO_0021,
- "Id in use: " + connectionId);
+ "Id in use: " + linkId);
}
- handler.deleteConnection(connectionId, conn);
+ handler.deleteLink(linkId, conn);
return null;
}
});
@@ -340,11 +340,11 @@ public class JdbcRepository extends Repository {
* {@inheritDoc}
*/
@Override
- public MConnection findConnection(final long connectionId) {
- return (MConnection) doWithConnection(new DoWithConnection() {
+ public MLink findLink(final long connectionId) {
+ return (MLink) doWithConnection(new DoWithConnection() {
@Override
public Object doIt(Connection conn) {
- return handler.findConnection(connectionId, conn);
+ return handler.findLink(connectionId, conn);
}
});
}
@@ -354,11 +354,11 @@ public class JdbcRepository extends Repository {
*/
@SuppressWarnings("unchecked")
@Override
- public List<MConnection> findConnections() {
- return (List<MConnection>) doWithConnection(new DoWithConnection() {
+ public List<MLink> findLinks() {
+ return (List<MLink>) doWithConnection(new DoWithConnection() {
@Override
public Object doIt(Connection conn) {
- return handler.findConnections(conn);
+ return handler.findLinks(conn);
}
});
}
@@ -601,12 +601,12 @@ public class JdbcRepository extends Repository {
* {@inheritDoc}
*/
@Override
- public List<MConnection> findConnectionsForConnector(final long
+ public List<MLink> findLinksForConnector(final long
connectorID) {
- return (List<MConnection>) doWithConnection(new DoWithConnection() {
+ return (List<MLink>) doWithConnection(new DoWithConnection() {
@Override
public Object doIt(Connection conn) throws Exception {
- return handler.findConnectionsForConnector(connectorID, conn);
+ return handler.findLinksForConnector(connectorID, conn);
}
});
}
@@ -637,12 +637,11 @@ public class JdbcRepository extends Repository {
}
@Override
- protected void deleteConnectionInputs(final long connectionID,
- RepositoryTransaction tx) {
+ protected void deleteLinkInputs(final long linkId, RepositoryTransaction tx) {
doWithConnection(new DoWithConnection() {
@Override
public Object doIt(Connection conn) throws Exception {
- handler.deleteConnectionInputs(connectionID, conn);
+ handler.deleteLinkInputs(linkId, conn);
return null;
}
}, (JdbcRepositoryTransaction) tx);
@@ -665,12 +664,11 @@ public class JdbcRepository extends Repository {
}
- protected void updateFramework(final MFramework mFramework,
- RepositoryTransaction tx) {
+ protected void updateDriverConfig(final MDriverConfig mDriverConfig, RepositoryTransaction tx) {
doWithConnection(new DoWithConnection() {
@Override
public Object doIt(Connection conn) throws Exception {
- handler.updateFramework(mFramework, conn);
+ handler.updateDriverConfig(mDriverConfig, conn);
return null;
}
}, (JdbcRepositoryTransaction) tx);
http://git-wip-us.apache.org/repos/asf/sqoop/blob/049994a0/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java b/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java
index 4de3134..a743491 100644
--- a/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java
+++ b/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java
@@ -21,9 +21,9 @@ import java.sql.Connection;
import java.util.Date;
import java.util.List;
-import org.apache.sqoop.model.MConnection;
+import org.apache.sqoop.model.MLink;
import org.apache.sqoop.model.MConnector;
-import org.apache.sqoop.model.MFramework;
+import org.apache.sqoop.model.MDriverConfig;
import org.apache.sqoop.model.MJob;
import org.apache.sqoop.model.MSubmission;
@@ -41,11 +41,10 @@ public abstract class JdbcRepositoryHandler {
/**
* Search for connector with given name in repository.
- *
- * And return corresponding metadata structure.
+ * And return corresponding connector structure.
*
* @param shortName Connector unique name
- * @param conn JDBC connection for querying repository.
+ * @param conn JDBC link for querying repository.
* @return null if connector is not yet registered in repository or
* loaded representation.
*/
@@ -65,26 +64,26 @@ public abstract class JdbcRepositoryHandler {
* already registered or present in the repository.
*
* @param mc Connector that should be registered.
- * @param conn JDBC connection for querying repository.
+ * @param conn JDBC link for querying repository.
*/
public abstract void registerConnector(MConnector mc, Connection conn);
/**
- * Retrieve connections which use the given connector.
- * @param connectorID Connector ID whose connections should be fetched
- * @param conn JDBC connection for querying repository
- * @return List of MConnections that use <code>connectorID</code>.
+ * Retrieve links which use the given connector.
+ * @param connectorID Connector ID whose links should be fetched
+ * @param conn JDBC link for querying repository
+ * @return List of MLinks that use <code>connectorID</code>.
*/
- public abstract List<MConnection> findConnectionsForConnector(long
+ public abstract List<MLink> findLinksForConnector(long
connectorID, Connection conn);
/**
- * Retrieve jobs which use the given connection.
+ * Retrieve jobs which use the given link.
*
* @param connectorID Connector ID whose jobs should be fetched
- * @param conn JDBC connection for querying repository
- * @return List of MJobs that use <code>connectionID</code>.
+ * @param conn JDBC link for querying repository
+ * @return List of MJobs that use <code>linkID</code>.
*/
public abstract List<MJob> findJobsForConnector(long connectorID,
Connection conn);
@@ -99,47 +98,47 @@ public abstract class JdbcRepositoryHandler {
*
* @param mConnector The new data to be inserted into the repository for
* this connector.
- * @param conn JDBC connection for querying repository
+ * @param conn JDBC link for querying repository
*/
public abstract void updateConnector(MConnector mConnector, Connection conn);
/**
- * Update the framework with the new data supplied in the
- * <tt>mFramework</tt>.
+ * Update the driverConfig with the new data supplied in the
+ * <tt>mDriverConfig</tt>.
* Also Update all forms in the repository
- * with the forms specified in <tt>mFramework</tt>. <tt>mFramework </tt> must
+ * with the forms specified in <tt>mDriverConfig</tt>. <tt>mDriverConfig </tt> must
* minimally have the connectorID and all required forms (including ones
* which may not have changed). After this operation the repository is
* guaranteed to only have the new forms specified in this object.
*
- * @param mFramework The new data to be inserted into the repository for
- * the framework.
- * @param conn JDBC connection for querying repository
+ * @param mDriverConfig The new data to be inserted into the repository for
+ * the driverConfig.
+ * @param conn JDBC link for querying repository
*/
- public abstract void updateFramework(MFramework mFramework, Connection conn);
+ public abstract void updateDriverConfig(MDriverConfig mDriverConfig, Connection conn);
/**
- * Search for framework metadata in the repository.
+ * Search for driverConfigin the repository.
*
- * @param conn JDBC connection for querying repository.
- * @return null if framework metadata are not yet present in repository or
+ * @param conn JDBC link for querying repository.
+ * @return null if driverConfig are not yet present in repository or
* loaded representation.
*/
- public abstract MFramework findFramework(Connection conn);
+ public abstract MDriverConfig findDriverConfig(Connection conn);
/**
- * Register framework metadata in repository.
+ * Register driver config in repository.
*
- * Save framework metadata into repository. Metadata should not be already
+ * Save driver config into repository. Driver config should not be already
* registered or present in the repository.
*
- * @param mf Framework metadata that should be registered.
- * @param conn JDBC connection for querying repository.
+ * @param driverConfig Driver config that should be registered.
+ * @param conn JDBC link for querying repository.
*/
- public abstract void registerFramework(MFramework mf, Connection conn);
+ public abstract void registerDriverConfig(MDriverConfig driverConfig, Connection conn);
/**
* Return true if repository tables exists and are suitable for use.
@@ -169,95 +168,92 @@ public abstract class JdbcRepositoryHandler {
public abstract void shutdown();
/**
- * Specify query that Sqoop framework can use to validate connection to
+ * Specify query that Sqoop can use to validate link to
* repository. This query should return at least one row.
*
* @return Query or NULL in case that this repository do not support or do not
- * want to validate live connections.
+ * want to validate live links.
*/
public abstract String validationQuery();
/**
- * Save given connection to repository. This connection must not be already
+ * Save given link to repository. This link must not be already
* present in the repository otherwise exception will be thrown.
*
- * @param connection Connection object to serialize into repository.
- * @param conn Connection to metadata repository
+ * @param link Link object to serialize into repository.
+ * @param conn Connection to the repository
*/
- public abstract void createConnection(MConnection connection,
- Connection conn);
+ public abstract void createLink(MLink link, Connection conn);
/**
- * Update given connection representation in repository. This connection
+ * Update given link representation in repository. This link
* object must already exists in the repository otherwise exception will be
* thrown.
*
- * @param connection Connection object that should be updated in repository.
- * @param conn Connection to metadata repository
+ * @param link Link object that should be updated in repository.
+ * @param conn Connection to the repository
*/
- public abstract void updateConnection(MConnection connection,
- Connection conn);
+ public abstract void updateLink(MLink link, Connection conn);
/**
- * Check if given connection exists in metastore.
+ * Check if given link exists in repository.
*
- * @param connetionId Connection id
- * @param conn Connection to metadata repository
- * @return True if the connection exists
+ * @param linkId Link id
+ * @param conn Connection to the repository
+ * @return True if the link exists
*/
- public abstract boolean existsConnection(long connetionId, Connection conn);
+ public abstract boolean existsLink(long linkId, Connection conn);
/**
* Check if given Connection id is referenced somewhere and thus can't
* be removed.
*
- * @param connectionId Connection id
- * @param conn Connection to metadata repository
+ * @param linkId Link id
+ * @param conn Connection to the repository
* @return
*/
- public abstract boolean inUseConnection(long connectionId, Connection conn);
+ public abstract boolean inUseLink(long linkId, Connection conn);
/**
- * Enable or disable connection with given id from metadata repository
+ * Enable or disable link with given id from the repository
*
- * @param connectionId Connection object that is going to be enabled or disabled
+ * @param linkId Link object that is going to be enabled or disabled
* @param enabled Enable or disable
- * @param conn Connection to metadata repository
+ * @param conn Connection to the repository
*/
- public abstract void enableConnection(long connectionId, boolean enabled, Connection conn);
+ public abstract void enableLink(long linkId, boolean enabled, Connection conn);
/**
- * Delete connection with given id from metadata repository.
+ * Delete link with given id from the repository.
*
- * @param connectionId Connection object that should be removed from repository
- * @param conn Connection to metadata repository
+ * @param linkId Link object that should be removed from repository
+ * @param conn Connection to the repository
*/
- public abstract void deleteConnection(long connectionId, Connection conn);
+ public abstract void deleteLink(long linkId, Connection conn);
/**
- * Delete the input values for the connection with given id from the
+ * Delete the input values for the link with given id from the
* repository.
- * @param id Connection object whose inputs should be removed from repository
- * @param conn Connection to metadata repository
+ * @param id Link object whose inputs should be removed from repository
+ * @param conn Connection to the repository
*/
- public abstract void deleteConnectionInputs(long id, Connection conn);
+ public abstract void deleteLinkInputs(long id, Connection conn);
/**
- * Find connection with given id in repository.
+ * Find link with given id in repository.
*
- * @param connectionId Connection id
- * @param conn Connection to metadata repository
- * @return Deserialized form of the connection that is saved in repository
+ * @param linkId Link id
+ * @param conn Connection to the repository
+ * @return Deserialized form of the link that is saved in repository
*/
- public abstract MConnection findConnection(long connectionId,
- Connection conn);
+ public abstract MLink findLink(long linkId, Connection conn);
/**
- * Get all connection objects.
+ * Get all link objects.
*
- * @param conn Connection to metadata repository
- * @return List will all saved connection objects
+ * @param conn Connection to the repository
+ * @return List will all saved link objects
*/
- public abstract List<MConnection> findConnections(Connection conn);
+ public abstract List<MLink> findLinks(Connection conn);
/**
@@ -265,7 +261,7 @@ public abstract class JdbcRepositoryHandler {
* present in the repository otherwise exception will be thrown.
*
* @param job Job object to serialize into repository.
- * @param conn Connection to metadata repository
+ * @param conn Connection to the repository
*/
public abstract void createJob(MJob job, Connection conn);
@@ -275,15 +271,15 @@ public abstract class JdbcRepositoryHandler {
* thrown.
*
* @param job Job object that should be updated in repository.
- * @param conn Connection to metadata repository
+ * @param conn Connection to the repository
*/
public abstract void updateJob(MJob job, Connection conn);
/**
- * Check if given job exists in metastore.
+ * Check if given job exists in the repository.
*
* @param jobId Job id
- * @param conn Connection to metadata repository
+ * @param conn Connection to the repository
* @return True if the job exists
*/
public abstract boolean existsJob(long jobId, Connection conn);
@@ -293,7 +289,7 @@ public abstract class JdbcRepositoryHandler {
* be removed.
*
* @param jobId Job id
- * @param conn Connection to metadata repository
+ * @param conn Connection to the repository
* @return
*/
public abstract boolean inUseJob(long jobId, Connection conn);
@@ -303,22 +299,22 @@ public abstract class JdbcRepositoryHandler {
*
* @param jobId Job id
* @param enabled Enable or disable
- * @param conn Connection to metadata repository
+ * @param conn Connection to the repository
*/
public abstract void enableJob(long jobId, boolean enabled, Connection conn);
/**
* Delete the input values for the job with given id from the repository.
* @param id Job object whose inputs should be removed from repository
- * @param conn Connection to metadata repository
+ * @param conn Connection to the repository
*/
public abstract void deleteJobInputs(long id, Connection conn);
/**
- * Delete job with given id from metadata repository. This method will
+ * Delete job with given id from the repository. This method will
* delete all inputs for this job also.
*
* @param jobId Job object that should be removed from repository
- * @param conn Connection to metadata repository
+ * @param conn Connection to the repository
*/
public abstract void deleteJob(long jobId, Connection conn);
@@ -326,7 +322,7 @@ public abstract class JdbcRepositoryHandler {
* Find job with given id in repository.
*
* @param jobId Job id
- * @param conn Connection to metadata repository
+ * @param conn Connection to the repository
* @return Deserialized form of the job that is present in the repository
*/
public abstract MJob findJob(long jobId, Connection conn);
@@ -334,7 +330,7 @@ public abstract class JdbcRepositoryHandler {
/**
* Get all job objects.
*
- * @param conn Connection to metadata repository
+ * @param conn Connection to the repository
* @return List will all saved job objects
*/
public abstract List<MJob> findJobs(Connection conn);
@@ -343,16 +339,15 @@ public abstract class JdbcRepositoryHandler {
* Save given submission in repository.
*
* @param submission Submission object
- * @param conn Connection to metadata repository
+ * @param conn Connection to the repository
*/
- public abstract void createSubmission(MSubmission submission,
- Connection conn);
+ public abstract void createSubmission(MSubmission submission, Connection conn);
/**
* Check if submission with given id already exists in repository.
*
* @param submissionId Submission internal id
- * @param conn Connection to metadata repository
+ * @param conn Connection to the repository
*/
public abstract boolean existsSubmission(long submissionId, Connection conn);
@@ -360,39 +355,38 @@ public abstract class JdbcRepositoryHandler {
* Update given submission in repository.
*
* @param submission Submission object
- * @param conn Connection to metadata repository
+ * @param conn Connection to the repository
*/
- public abstract void updateSubmission(MSubmission submission,
- Connection conn);
+ public abstract void updateSubmission(MSubmission submission, Connection conn);
/**
* Remove submissions older then threshold from repository.
*
* @param threshold Threshold date
- * @param conn Connection to metadata repository
+ * @param conn Connection to the repository
*/
public abstract void purgeSubmissions(Date threshold, Connection conn);
/**
* Return list of unfinished submissions (as far as repository is concerned).
*
- * @param conn Connection to metadata repository
+ * @param conn Connection to the repository
* @return List of unfinished submissions.
*/
public abstract List<MSubmission> findSubmissionsUnfinished(Connection conn);
/**
- * Return list of all submissions from metadata repository.
+ * Return list of all submissions from the repository.
*
- * @param conn Connection to metadata repository
+ * @param conn Connection to the repository
* @return List of all submissions.
*/
public abstract List<MSubmission> findSubmissions(Connection conn);
/**
- * Return list of submissions from metadata repository for given jobId.
+ * Return list of submissions from the repository for given jobId.
* @param jobId Job id
- * @param conn Connection to metadata repository
+ * @param conn Connection to the repository
* @return List of submissions
*/
public abstract List<MSubmission> findSubmissionsForJob(long jobId, Connection conn);
@@ -401,7 +395,7 @@ public abstract class JdbcRepositoryHandler {
* Find last submission for given jobId.
*
* @param jobId Job id
- * @param conn Connection to metadata repository
+ * @param conn Connection to the repository
* @return Most recent submission
*/
public abstract MSubmission findSubmissionLastForJob(long jobId,