You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ja...@apache.org on 2014/10/22 05:48:10 UTC
[2/2] git commit: SQOOP-1557: Sqoop2: SQ_CONFIGURABLE ( for entities
who own configs)
SQOOP-1557: Sqoop2: SQ_CONFIGURABLE ( for entities who own configs)
(Veena Basavaraj via Jarek Jarcec Cecho)
Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/151a0a12
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/151a0a12
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/151a0a12
Branch: refs/heads/sqoop2
Commit: 151a0a12a96b32c7f9f08c0199fbd907ac6097da
Parents: 39a2200
Author: Jarek Jarcec Cecho <ja...@apache.org>
Authored: Tue Oct 21 20:47:46 2014 -0700
Committer: Jarek Jarcec Cecho <ja...@apache.org>
Committed: Tue Oct 21 20:47:46 2014 -0700
----------------------------------------------------------------------
.../java/org/apache/sqoop/model/MConnector.java | 4 +
.../java/org/apache/sqoop/model/MDriver.java | 18 +-
.../java/org/apache/sqoop/driver/Driver.java | 4 +
.../apache/sqoop/repository/JdbcRepository.java | 28 +-
.../sqoop/repository/JdbcRepositoryHandler.java | 18 +-
.../org/apache/sqoop/repository/Repository.java | 16 +-
.../sqoop/repository/TestJdbcRepository.java | 52 +-
.../repository/derby/DerbyRepoConstants.java | 8 +-
.../sqoop/repository/derby/DerbyRepoError.java | 8 +-
.../derby/DerbyRepositoryHandler.java | 310 +++++-----
.../repository/derby/DerbySchemaConstants.java | 26 +-
.../repository/derby/DerbySchemaQuery.java | 583 +++++++++++--------
.../sqoop/repository/derby/DerbyTestCase.java | 89 ++-
.../repository/derby/TestConnectorHandling.java | 25 +-
.../repository/derby/TestDriverHandling.java | 70 +--
.../sqoop/repository/derby/TestJobHandling.java | 3 +-
.../sqoop/tools/tool/RepositoryLoadTool.java | 223 ++++---
17 files changed, 821 insertions(+), 664 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/sqoop/blob/151a0a12/common/src/main/java/org/apache/sqoop/model/MConnector.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/model/MConnector.java b/common/src/main/java/org/apache/sqoop/model/MConnector.java
index 174d0b9..1b9462e 100644
--- a/common/src/main/java/org/apache/sqoop/model/MConnector.java
+++ b/common/src/main/java/org/apache/sqoop/model/MConnector.java
@@ -181,6 +181,10 @@ public final class MConnector extends Configurable {
return version;
}
+ public MConfigurableType getType() {
+ return MConfigurableType.CONNECTOR;
+ }
+
public SupportedDirections getSupportedDirections() {
return new SupportedDirections(this.getConfig(Direction.FROM) != null,
this.getConfig(Direction.TO) != null);
http://git-wip-us.apache.org/repos/asf/sqoop/blob/151a0a12/common/src/main/java/org/apache/sqoop/model/MDriver.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/model/MDriver.java b/common/src/main/java/org/apache/sqoop/model/MDriver.java
index 4241a31..cc47511 100644
--- a/common/src/main/java/org/apache/sqoop/model/MDriver.java
+++ b/common/src/main/java/org/apache/sqoop/model/MDriver.java
@@ -17,15 +17,17 @@
*/
package org.apache.sqoop.model;
-import java.sql.Driver;
/**
* Describes the configs associated with the {@link Driver} for executing sqoop jobs.
*/
public final class MDriver extends Configurable {
+ public static final String DRIVER_NAME = "SqoopDriver";
private final MDriverConfig driverConfig;
- private final String version;
+ private String version;
+ // Since there is only one Driver in the system, the name is not user specified
+ private static final String uniqueName = DRIVER_NAME;
public MDriver(MDriverConfig driverConfig, String version) {
this.driverConfig = driverConfig;
@@ -68,6 +70,14 @@ public final class MDriver extends Configurable {
return driverConfig;
}
+ public MConfigurableType getType() {
+ return MConfigurableType.DRIVER;
+ }
+
+ public String getUniqueName() {
+ return uniqueName;
+ }
+
@Override
public MDriver clone(boolean cloneWithValue) {
cloneWithValue = false;
@@ -79,4 +89,8 @@ public final class MDriver extends Configurable {
public String getVersion() {
return version;
}
+
+ public void setVersion(String version) {
+ this.version = version;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/sqoop/blob/151a0a12/core/src/main/java/org/apache/sqoop/driver/Driver.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/driver/Driver.java b/core/src/main/java/org/apache/sqoop/driver/Driver.java
index 46a16ac..6942891 100644
--- a/core/src/main/java/org/apache/sqoop/driver/Driver.java
+++ b/core/src/main/java/org/apache/sqoop/driver/Driver.java
@@ -158,6 +158,10 @@ public class Driver implements Reconfigurable {
return mDriver;
}
+ public static String getClassName() {
+ return Driver.getInstance().getClass().getSimpleName();
+ }
+
public ResourceBundle getBundle(Locale locale) {
return ResourceBundle.getBundle(DriverConstants.DRIVER_CONFIG_BUNDLE, locale);
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/151a0a12/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java b/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java
index 476830d..d7b526a 100644
--- a/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java
+++ b/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java
@@ -23,10 +23,11 @@ import java.util.List;
import org.apache.log4j.Logger;
import org.apache.sqoop.common.SqoopException;
-import org.apache.sqoop.model.MLink;
+import org.apache.sqoop.driver.Driver;
import org.apache.sqoop.model.MConnector;
import org.apache.sqoop.model.MDriver;
import org.apache.sqoop.model.MJob;
+import org.apache.sqoop.model.MLink;
import org.apache.sqoop.model.MSubmission;
public class JdbcRepository extends Repository {
@@ -220,7 +221,7 @@ public class JdbcRepository extends Repository {
return (MDriver) doWithConnection(new DoWithConnection() {
@Override
public Object doIt(Connection conn) {
- MDriver existingDriverConfig = handler.findDriver(conn);
+ MDriver existingDriverConfig = handler.findDriver(mDriver.getUniqueName(), conn);
if (existingDriverConfig == null) {
handler.registerDriver(mDriver, conn);
return mDriver;
@@ -233,7 +234,7 @@ public class JdbcRepository extends Repository {
return mDriver;
} else {
throw new SqoopException(RepositoryError.JDBCREPO_0026,
- "DriverConfig: " + mDriver.getPersistenceId());
+ "Driver: " + mDriver.getPersistenceId());
}
}
return existingDriverConfig;
@@ -246,6 +247,19 @@ public class JdbcRepository extends Repository {
* {@inheritDoc}
*/
@Override
+ public MDriver findDriver(final String shortName) {
+ return (MDriver) doWithConnection(new DoWithConnection() {
+ @Override
+ public Object doIt(Connection conn) throws Exception {
+ return handler.findDriver(shortName, conn);
+ }
+ });
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
public void createLink(final MLink link) {
doWithConnection(new DoWithConnection() {
@Override
@@ -648,23 +662,23 @@ public class JdbcRepository extends Repository {
* {@inheritDoc}
*/
@Override
- protected void upgradeConnectorConfigs(final MConnector newConnector,
+ protected void upgradeConnectorAndConfigs(final MConnector newConnector,
RepositoryTransaction tx) {
doWithConnection(new DoWithConnection() {
@Override
public Object doIt(Connection conn) throws Exception {
- handler.upgradeConnectorConfigs(newConnector, conn);
+ handler.upgradeConnectorAndConfigs(newConnector, conn);
return null;
}
}, (JdbcRepositoryTransaction) tx);
}
- protected void upgradeDriverConfigs(final MDriver mDriver, RepositoryTransaction tx) {
+ protected void upgradeDriverAndConfigs(final MDriver mDriver, RepositoryTransaction tx) {
doWithConnection(new DoWithConnection() {
@Override
public Object doIt(Connection conn) throws Exception {
- handler.upgradeDriverConfigs(mDriver, conn);
+ handler.upgradeDriverAndConfigs(mDriver, conn);
return null;
}
}, (JdbcRepositoryTransaction) tx);
http://git-wip-us.apache.org/repos/asf/sqoop/blob/151a0a12/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java b/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java
index 4c5229f..7d78826 100644
--- a/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java
+++ b/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java
@@ -21,10 +21,10 @@ import java.sql.Connection;
import java.util.Date;
import java.util.List;
-import org.apache.sqoop.model.MLink;
import org.apache.sqoop.model.MConnector;
import org.apache.sqoop.model.MDriver;
import org.apache.sqoop.model.MJob;
+import org.apache.sqoop.model.MLink;
import org.apache.sqoop.model.MSubmission;
/**
@@ -41,7 +41,7 @@ public abstract class JdbcRepositoryHandler {
/**
* Search for connector with given name in repository.
- * And return corresponding connector structure.
+ * And return corresponding connector entity.
*
* @param shortName Connector unique name
* @param conn JDBC link for querying repository.
@@ -101,8 +101,7 @@ public abstract class JdbcRepositoryHandler {
* @param conn JDBC link for querying repository
*/
- public abstract void upgradeConnectorConfigs(MConnector mConnector, Connection conn);
-
+ public abstract void upgradeConnectorAndConfigs(MConnector mConnector, Connection conn);
/**
* Upgrade the driver with the new data supplied in the
@@ -117,17 +116,16 @@ public abstract class JdbcRepositoryHandler {
* the driverConfig.
* @param conn JDBC link for querying repository
*/
- public abstract void upgradeDriverConfigs(MDriver mDriver, Connection conn);
-
+ public abstract void upgradeDriverAndConfigs(MDriver mDriver, Connection conn);
/**
- * Search for driverConfigin the repository.
- *
+ * Search for driver in the repository.
+ * @params shortName the name for the driver
* @param conn JDBC link for querying repository.
- * @return null if driverConfig are not yet present in repository or
+ * @return null if driver are not yet present in repository or
* loaded representation.
*/
- public abstract MDriver findDriver(Connection conn);
+ public abstract MDriver findDriver(String shortName, Connection conn);
/**
* Register driver config in repository.
http://git-wip-us.apache.org/repos/asf/sqoop/blob/151a0a12/core/src/main/java/org/apache/sqoop/repository/Repository.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/repository/Repository.java b/core/src/main/java/org/apache/sqoop/repository/Repository.java
index 8f78052..bd2a3be 100644
--- a/core/src/main/java/org/apache/sqoop/repository/Repository.java
+++ b/core/src/main/java/org/apache/sqoop/repository/Repository.java
@@ -119,6 +119,14 @@ public abstract class Repository {
public abstract List<MConnector> findConnectors();
/**
+ * Search for driver in the repository.
+ * @param shortName Driver unique name
+ * @return null if driver are not yet present in repository or
+ * loaded representation.
+ */
+ public abstract MDriver findDriver(String shortName);
+
+ /**
* Save given link to repository. This link must not be already
* present in the repository otherwise exception will be thrown.
*
@@ -317,7 +325,7 @@ public abstract class Repository {
* method will not call begin, commit,
* rollback or close on this transaction.
*/
- protected abstract void upgradeConnectorConfigs(MConnector newConnector, RepositoryTransaction tx);
+ protected abstract void upgradeConnectorAndConfigs(MConnector newConnector, RepositoryTransaction tx);
/**
* Upgrade the driver with the new data supplied in the
@@ -335,7 +343,7 @@ public abstract class Repository {
* method will not call begin, commit,
* rollback or close on this transaction.
*/
- protected abstract void upgradeDriverConfigs(MDriver newDriver, RepositoryTransaction tx);
+ protected abstract void upgradeDriverAndConfigs(MDriver newDriver, RepositoryTransaction tx);
/**
* Delete all inputs for a job
@@ -410,7 +418,7 @@ public abstract class Repository {
deletelinksAndJobs(existingLinksByConnector, existingJobsByConnector, tx);
// 5. Delete all inputs and configs associated with the connector, and
// insert the new configs and inputs for this connector
- upgradeConnectorConfigs(newConnector, tx);
+ upgradeConnectorAndConfigs(newConnector, tx);
// 6. Run upgrade logic for the configs related to the link objects
// dont always rely on the repository implementation to return empty list for links
if (existingLinksByConnector != null) {
@@ -514,7 +522,7 @@ public abstract class Repository {
deleteJobs(existingJobs, tx);
// 4. Delete all inputs and configs associated with the driver, and
// insert the new configs and inputs for this driver
- upgradeDriverConfigs(driver, tx);
+ upgradeDriverAndConfigs(driver, tx);
for (MJob job : existingJobs) {
// Make a new copy of the configs
http://git-wip-us.apache.org/repos/asf/sqoop/blob/151a0a12/core/src/test/java/org/apache/sqoop/repository/TestJdbcRepository.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/sqoop/repository/TestJdbcRepository.java b/core/src/test/java/org/apache/sqoop/repository/TestJdbcRepository.java
index ff9e0c3..ae0e922 100644
--- a/core/src/test/java/org/apache/sqoop/repository/TestJdbcRepository.java
+++ b/core/src/test/java/org/apache/sqoop/repository/TestJdbcRepository.java
@@ -167,10 +167,10 @@ public class TestJdbcRepository {
*/
@Test
public void testDriverConfigEnableAutoUpgrade() {
- MDriver newDriverConfig = driver();
- MDriver oldDriverConfig = anotherDriver();
+ MDriver newDriver = driver();
+ MDriver oldDriver = anotherDriver();
- when(repoHandlerMock.findDriver(any(Connection.class))).thenReturn(oldDriverConfig);
+ when(repoHandlerMock.findDriver(anyString(), any(Connection.class))).thenReturn(oldDriver);
// make the upgradeDriverConfig to throw an exception to prove that it has been called
SqoopException exception = new SqoopException(RepositoryError.JDBCREPO_0000,
@@ -178,10 +178,10 @@ public class TestJdbcRepository {
doThrow(exception).when(repoHandlerMock).findJobs(any(Connection.class));
try {
- repoSpy.registerDriver(newDriverConfig, true);
+ repoSpy.registerDriver(newDriver, true);
} catch (SqoopException ex) {
assertEquals(ex.getMessage(), exception.getMessage());
- verify(repoHandlerMock, times(1)).findDriver(any(Connection.class));
+ verify(repoHandlerMock, times(1)).findDriver(anyString(), any(Connection.class));
verify(repoHandlerMock, times(1)).findJobs(any(Connection.class));
verifyNoMoreInteractions(repoHandlerMock);
return ;
@@ -195,16 +195,16 @@ public class TestJdbcRepository {
*/
@Test
public void testDriverConfigDisableAutoUpgrade() {
- MDriver newDriverConfig = driver();
- MDriver oldDriverConfig = anotherDriver();
+ MDriver newDriver = driver();
+ MDriver oldDriver = anotherDriver();
- when(repoHandlerMock.findDriver(any(Connection.class))).thenReturn(oldDriverConfig);
+ when(repoHandlerMock.findDriver(anyString(), any(Connection.class))).thenReturn(oldDriver);
try {
- repoSpy.registerDriver(newDriverConfig, false);
+ repoSpy.registerDriver(newDriver, false);
} catch (SqoopException ex) {
assertEquals(ex.getErrorCode(), RepositoryError.JDBCREPO_0026);
- verify(repoHandlerMock, times(1)).findDriver(any(Connection.class));
+ verify(repoHandlerMock, times(1)).findDriver(anyString(),any(Connection.class));
verifyNoMoreInteractions(repoHandlerMock);
return ;
}
@@ -242,7 +242,7 @@ public class TestJdbcRepository {
doReturn(jobList).when(repoSpy).findJobsForConnector(anyLong());
doNothing().when(repoSpy).updateLink(any(MLink.class), any(RepositoryTransaction.class));
doNothing().when(repoSpy).updateJob(any(MJob.class), any(RepositoryTransaction.class));
- doNothing().when(repoSpy).upgradeConnectorConfigs(any(MConnector.class), any(RepositoryTransaction.class));
+ doNothing().when(repoSpy).upgradeConnectorAndConfigs(any(MConnector.class), any(RepositoryTransaction.class));
repoSpy.upgradeConnector(oldConnector, newConnector);
@@ -258,7 +258,7 @@ public class TestJdbcRepository {
repoOrder.verify(repoSpy, times(1)).deleteJobInputs(2, repoTransactionMock);
repoOrder.verify(repoSpy, times(1)).deleteLinkInputs(1, repoTransactionMock);
repoOrder.verify(repoSpy, times(1)).deleteLinkInputs(2, repoTransactionMock);
- repoOrder.verify(repoSpy, times(1)).upgradeConnectorConfigs(any(MConnector.class), any(RepositoryTransaction.class));
+ repoOrder.verify(repoSpy, times(1)).upgradeConnectorAndConfigs(any(MConnector.class), any(RepositoryTransaction.class));
repoOrder.verify(repoSpy, times(2)).updateLink(any(MLink.class), any(RepositoryTransaction.class));
repoOrder.verify(repoSpy, times(4)).updateJob(any(MJob.class), any(RepositoryTransaction.class));
repoOrder.verifyNoMoreInteractions();
@@ -296,7 +296,7 @@ public class TestJdbcRepository {
doReturn(jobList).when(repoSpy).findJobs();
doNothing().when(repoSpy).updateLink(any(MLink.class), any(RepositoryTransaction.class));
doNothing().when(repoSpy).updateJob(any(MJob.class), any(RepositoryTransaction.class));
- doNothing().when(repoSpy).upgradeDriverConfigs(any(MDriver.class), any(RepositoryTransaction.class));
+ doNothing().when(repoSpy).upgradeDriverAndConfigs(any(MDriver.class), any(RepositoryTransaction.class));
repoSpy.upgradeDriver(newDriverConfig);
@@ -309,7 +309,7 @@ public class TestJdbcRepository {
repoOrder.verify(repoSpy, times(1)).getTransaction();
repoOrder.verify(repoSpy, times(1)).deleteJobInputs(1, repoTransactionMock);
repoOrder.verify(repoSpy, times(1)).deleteJobInputs(2, repoTransactionMock);
- repoOrder.verify(repoSpy, times(1)).upgradeDriverConfigs(any(MDriver.class), any(RepositoryTransaction.class));
+ repoOrder.verify(repoSpy, times(1)).upgradeDriverAndConfigs(any(MDriver.class), any(RepositoryTransaction.class));
repoOrder.verify(repoSpy, times(2)).updateJob(any(MJob.class), any(RepositoryTransaction.class));
repoOrder.verifyNoMoreInteractions();
txOrder.verify(repoTransactionMock, times(1)).begin();
@@ -339,7 +339,7 @@ public class TestJdbcRepository {
doReturn(jobList).when(repoSpy).findJobs();
doNothing().when(repoSpy).updateJob(any(MJob.class), any(RepositoryTransaction.class));
- doNothing().when(repoSpy).upgradeDriverConfigs(any(MDriver.class), any(RepositoryTransaction.class));
+ doNothing().when(repoSpy).upgradeDriverAndConfigs(any(MDriver.class), any(RepositoryTransaction.class));
try {
repoSpy.upgradeDriver(newDriverConfig);
@@ -355,7 +355,7 @@ public class TestJdbcRepository {
repoOrder.verify(repoSpy, times(1)).getTransaction();
repoOrder.verify(repoSpy, times(1)).deleteJobInputs(1, repoTransactionMock);
repoOrder.verify(repoSpy, times(1)).deleteJobInputs(2, repoTransactionMock);
- repoOrder.verify(repoSpy, times(1)).upgradeDriverConfigs(any(MDriver.class), any(RepositoryTransaction.class));
+ repoOrder.verify(repoSpy, times(1)).upgradeDriverAndConfigs(any(MDriver.class), any(RepositoryTransaction.class));
repoOrder.verifyNoMoreInteractions();
txOrder.verify(repoTransactionMock, times(1)).begin();
txOrder.verify(repoTransactionMock, times(1)).rollback();
@@ -535,7 +535,7 @@ public class TestJdbcRepository {
SqoopException exception = new SqoopException(RepositoryError.JDBCREPO_0000,
"update connector error.");
- doThrow(exception).when(repoHandlerMock).upgradeConnectorConfigs(any(MConnector.class), any(Connection.class));
+ doThrow(exception).when(repoHandlerMock).upgradeConnectorAndConfigs(any(MConnector.class), any(Connection.class));
try {
repoSpy.upgradeConnector(oldConnector, newConnector);
@@ -545,7 +545,7 @@ public class TestJdbcRepository {
verify(repoHandlerMock, times(1)).findJobsForConnector(anyLong(), any(Connection.class));
verify(repoHandlerMock, times(2)).deleteJobInputs(anyLong(), any(Connection.class));
verify(repoHandlerMock, times(2)).deleteLinkInputs(anyLong(), any(Connection.class));
- verify(repoHandlerMock, times(1)).upgradeConnectorConfigs(any(MConnector.class), any(Connection.class));
+ verify(repoHandlerMock, times(1)).upgradeConnectorAndConfigs(any(MConnector.class), any(Connection.class));
verifyNoMoreInteractions(repoHandlerMock);
return ;
}
@@ -577,7 +577,7 @@ public class TestJdbcRepository {
doReturn(jobList).when(repoHandlerMock).findJobsForConnector(anyLong(), any(Connection.class));
doNothing().when(repoHandlerMock).deleteJobInputs(anyLong(), any(Connection.class));
doNothing().when(repoHandlerMock).deleteLinkInputs(anyLong(), any(Connection.class));
- doNothing().when(repoHandlerMock).upgradeConnectorConfigs(any(MConnector.class), any(Connection.class));
+ doNothing().when(repoHandlerMock).upgradeConnectorAndConfigs(any(MConnector.class), any(Connection.class));
doReturn(true).when(repoHandlerMock).existsLink(anyLong(), any(Connection.class));
SqoopException exception = new SqoopException(RepositoryError.JDBCREPO_0000,
@@ -592,7 +592,7 @@ public class TestJdbcRepository {
verify(repoHandlerMock, times(1)).findJobsForConnector(anyLong(), any(Connection.class));
verify(repoHandlerMock, times(2)).deleteJobInputs(anyLong(), any(Connection.class));
verify(repoHandlerMock, times(2)).deleteLinkInputs(anyLong(), any(Connection.class));
- verify(repoHandlerMock, times(1)).upgradeConnectorConfigs(any(MConnector.class), any(Connection.class));
+ verify(repoHandlerMock, times(1)).upgradeConnectorAndConfigs(any(MConnector.class), any(Connection.class));
verify(repoHandlerMock, times(1)).existsLink(anyLong(), any(Connection.class));
verify(repoHandlerMock, times(1)).updateLink(any(MLink.class), any(Connection.class));
verifyNoMoreInteractions(repoHandlerMock);
@@ -626,7 +626,7 @@ public class TestJdbcRepository {
doReturn(jobList).when(repoHandlerMock).findJobsForConnector(anyLong(), any(Connection.class));
doNothing().when(repoHandlerMock).deleteJobInputs(anyLong(), any(Connection.class));
doNothing().when(repoHandlerMock).deleteLinkInputs(anyLong(), any(Connection.class));
- doNothing().when(repoHandlerMock).upgradeConnectorConfigs(any(MConnector.class), any(Connection.class));
+ doNothing().when(repoHandlerMock).upgradeConnectorAndConfigs(any(MConnector.class), any(Connection.class));
doNothing().when(repoHandlerMock).updateLink(any(MLink.class), any(Connection.class));
doReturn(true).when(repoHandlerMock).existsLink(anyLong(), any(Connection.class));
doReturn(true).when(repoHandlerMock).existsJob(anyLong(), any(Connection.class));
@@ -643,7 +643,7 @@ public class TestJdbcRepository {
verify(repoHandlerMock, times(1)).findJobsForConnector(anyLong(), any(Connection.class));
verify(repoHandlerMock, times(2)).deleteJobInputs(anyLong(), any(Connection.class));
verify(repoHandlerMock, times(2)).deleteLinkInputs(anyLong(), any(Connection.class));
- verify(repoHandlerMock, times(1)).upgradeConnectorConfigs(any(MConnector.class), any(Connection.class));
+ verify(repoHandlerMock, times(1)).upgradeConnectorAndConfigs(any(MConnector.class), any(Connection.class));
verify(repoHandlerMock, times(2)).existsLink(anyLong(), any(Connection.class));
verify(repoHandlerMock, times(2)).updateLink(any(MLink.class), any(Connection.class));
verify(repoHandlerMock, times(1)).existsJob(anyLong(), any(Connection.class));
@@ -731,7 +731,7 @@ public class TestJdbcRepository {
SqoopException exception = new SqoopException(RepositoryError.JDBCREPO_0000,
"update driverConfig entity error.");
- doThrow(exception).when(repoHandlerMock).upgradeDriverConfigs(any(MDriver.class), any(Connection.class));
+ doThrow(exception).when(repoHandlerMock).upgradeDriverAndConfigs(any(MDriver.class), any(Connection.class));
try {
repoSpy.upgradeDriver(newDriverConfig);
@@ -739,7 +739,7 @@ public class TestJdbcRepository {
assertEquals(ex.getMessage(), exception.getMessage());
verify(repoHandlerMock, times(1)).findJobs(any(Connection.class));
verify(repoHandlerMock, times(2)).deleteJobInputs(anyLong(), any(Connection.class));
- verify(repoHandlerMock, times(1)).upgradeDriverConfigs(any(MDriver.class), any(Connection.class));
+ verify(repoHandlerMock, times(1)).upgradeDriverAndConfigs(any(MDriver.class), any(Connection.class));
verifyNoMoreInteractions(repoHandlerMock);
return ;
}
@@ -764,7 +764,7 @@ public class TestJdbcRepository {
List<MJob> jobList = jobs(job(1,1,1,1,1), job(2,1,1,2,1));
doReturn(jobList).when(repoHandlerMock).findJobs(any(Connection.class));
doNothing().when(repoHandlerMock).deleteJobInputs(anyLong(), any(Connection.class));
- doNothing().when(repoHandlerMock).upgradeDriverConfigs(any(MDriver.class), any(Connection.class));
+ doNothing().when(repoHandlerMock).upgradeDriverAndConfigs(any(MDriver.class), any(Connection.class));
doReturn(true).when(repoHandlerMock).existsJob(anyLong(), any(Connection.class));
SqoopException exception = new SqoopException(RepositoryError.JDBCREPO_0000,
@@ -777,7 +777,7 @@ public class TestJdbcRepository {
assertEquals(ex.getMessage(), exception.getMessage());
verify(repoHandlerMock, times(1)).findJobs(any(Connection.class));
verify(repoHandlerMock, times(2)).deleteJobInputs(anyLong(), any(Connection.class));
- verify(repoHandlerMock, times(1)).upgradeDriverConfigs(any(MDriver.class), any(Connection.class));
+ verify(repoHandlerMock, times(1)).upgradeDriverAndConfigs(any(MDriver.class), any(Connection.class));
verify(repoHandlerMock, times(1)).existsJob(anyLong(), any(Connection.class));
verify(repoHandlerMock, times(1)).updateJob(any(MJob.class), any(Connection.class));
verifyNoMoreInteractions(repoHandlerMock);
http://git-wip-us.apache.org/repos/asf/sqoop/blob/151a0a12/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepoConstants.java
----------------------------------------------------------------------
diff --git a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepoConstants.java b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepoConstants.java
index 40dcc49..8fbf47f 100644
--- a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepoConstants.java
+++ b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepoConstants.java
@@ -22,13 +22,9 @@ public final class DerbyRepoConstants {
public static final String CONF_PREFIX_DERBY = "derby.";
@Deprecated
- // use only for the upgrade code should be removed soon
+ // use only for the upgrade code
public static final String SYSKEY_VERSION = "version";
-
- public static final String SYSKEY_DERBY_REPOSITORY_VERSION = "version";
-
- // TOOD(VB): SQOOP-1557 move the driver config version to the SQ_CONFIGURABLE, IT SHOULD NOT BE HERE, nor stored in SYSTEM table
- public static final String SYSKEY_DRIVER_CONFIG_VERSION = "driver.config.version";
+ public static final String SYSKEY_DERBY_REPOSITORY_VERSION = "repository.version";
/**
* Expected version of the repository structures.
http://git-wip-us.apache.org/repos/asf/sqoop/blob/151a0a12/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepoError.java
----------------------------------------------------------------------
diff --git a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepoError.java b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepoError.java
index 3e4a4a9..aad219e 100644
--- a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepoError.java
+++ b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepoError.java
@@ -188,8 +188,12 @@ public enum DerbyRepoError implements ErrorCode {
DERBYREPO_0048("Could not register config direction"),
- DERBYREPO_0049("Could not set connector direction")
- ;
+ DERBYREPO_0049("Could not set connector direction"),
+
+ /** The system was unable to register driver due to a server error **/
+ DERBYREPO_0050("Registration of driver failed"),
+
+ ;
private final String message;
http://git-wip-us.apache.org/repos/asf/sqoop/blob/151a0a12/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java
----------------------------------------------------------------------
diff --git a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java
index aa58850..633e9df 100644
--- a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java
+++ b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java
@@ -45,9 +45,11 @@ import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.common.SupportedDirections;
import org.apache.sqoop.connector.ConnectorHandler;
import org.apache.sqoop.connector.ConnectorManagerUtils;
+import org.apache.sqoop.driver.Driver;
import org.apache.sqoop.model.MBooleanInput;
import org.apache.sqoop.model.MConfig;
import org.apache.sqoop.model.MConfigType;
+import org.apache.sqoop.model.MConfigurableType;
import org.apache.sqoop.model.MConnector;
import org.apache.sqoop.model.MDriver;
import org.apache.sqoop.model.MDriverConfig;
@@ -102,7 +104,7 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
throw new SqoopException(DerbyRepoError.DERBYREPO_0011,
mc.getUniqueName());
}
- mc.setPersistenceId(getConnectorId(mc, conn));
+ mc.setPersistenceId(insertAndGetConnectorId(mc, conn));
insertConfigsForConnector(mc, conn);
}
@@ -116,10 +118,10 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
PreparedStatement baseConfigStmt = null;
PreparedStatement baseInputStmt = null;
try{
- baseConfigStmt = conn.prepareStatement(STMT_INSERT_CONFIG_BASE,
+ baseConfigStmt = conn.prepareStatement(STMT_INSERT_INTO_CONFIG,
Statement.RETURN_GENERATED_KEYS);
- baseInputStmt = conn.prepareStatement(STMT_INSERT_INPUT_BASE,
+ baseInputStmt = conn.prepareStatement(STMT_INSERT_INTO_INPUT,
Statement.RETURN_GENERATED_KEYS);
// Register the job config type, since driver config is per job
@@ -145,15 +147,14 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
PreparedStatement baseConfigStmt = null;
PreparedStatement baseInputStmt = null;
try{
- baseConfigStmt = conn.prepareStatement(STMT_INSERT_CONFIG_BASE,
+ baseConfigStmt = conn.prepareStatement(STMT_INSERT_INTO_CONFIG,
Statement.RETURN_GENERATED_KEYS);
- baseInputStmt = conn.prepareStatement(STMT_INSERT_INPUT_BASE,
+ baseInputStmt = conn.prepareStatement(STMT_INSERT_INTO_INPUT,
Statement.RETURN_GENERATED_KEYS);
- // Register link type config for connector
- // NOTE: The direction is null for LINK type
- registerConfigs(connectorId, null, mc.getLinkConfig().getConfigs(),
+ // Register link type config
+ registerConfigs(connectorId, null /* No direction for LINK type config*/, mc.getLinkConfig().getConfigs(),
MConfigType.LINK.name(), baseConfigStmt, baseInputStmt, conn);
// Register both from/to job type config for connector
@@ -202,19 +203,20 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
}
}
- private long getConnectorId(MConnector mc, Connection conn) {
+ private long insertAndGetConnectorId(MConnector mc, Connection conn) {
PreparedStatement baseConnectorStmt = null;
try {
- baseConnectorStmt = conn.prepareStatement(STMT_INSERT_CONNECTOR_BASE,
- Statement.RETURN_GENERATED_KEYS);
+ baseConnectorStmt = conn.prepareStatement(STMT_INSERT_INTO_CONFIGURABLE,
+ Statement.RETURN_GENERATED_KEYS);
baseConnectorStmt.setString(1, mc.getUniqueName());
baseConnectorStmt.setString(2, mc.getClassName());
baseConnectorStmt.setString(3, mc.getVersion());
+ baseConnectorStmt.setString(4, mc.getType().name());
int baseConnectorCount = baseConnectorStmt.executeUpdate();
if (baseConnectorCount != 1) {
throw new SqoopException(DerbyRepoError.DERBYREPO_0012,
- Integer.toString(baseConnectorCount));
+ Integer.toString(baseConnectorCount));
}
ResultSet rsetConnectorId = baseConnectorStmt.getGeneratedKeys();
@@ -222,19 +224,44 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
if (!rsetConnectorId.next()) {
throw new SqoopException(DerbyRepoError.DERBYREPO_0013);
}
-
- insertConnectorDirections(rsetConnectorId.getLong(1),
- mc.getSupportedDirections(), conn);
-
+ // connector configurable also have directions
+ insertConnectorDirections(rsetConnectorId.getLong(1), mc.getSupportedDirections(), conn);
return rsetConnectorId.getLong(1);
} catch (SQLException ex) {
- throw new SqoopException(DerbyRepoError.DERBYREPO_0014,
- mc.toString(), ex);
+ throw new SqoopException(DerbyRepoError.DERBYREPO_0014, mc.toString(), ex);
} finally {
closeStatements(baseConnectorStmt);
}
}
+ private long insertAndGetDriverId(MDriver mDriver, Connection conn) {
+ PreparedStatement baseDriverStmt = null;
+ try {
+ baseDriverStmt = conn.prepareStatement(STMT_INSERT_INTO_CONFIGURABLE,
+ Statement.RETURN_GENERATED_KEYS);
+ baseDriverStmt.setString(1, mDriver.getUniqueName());
+ baseDriverStmt.setString(2, Driver.getClassName());
+ baseDriverStmt.setString(3, mDriver.getVersion());
+ baseDriverStmt.setString(4, mDriver.getType().name());
+
+ int baseDriverCount = baseDriverStmt.executeUpdate();
+ if (baseDriverCount != 1) {
+ throw new SqoopException(DerbyRepoError.DERBYREPO_0012, Integer.toString(baseDriverCount));
+ }
+
+ ResultSet rsetDriverId = baseDriverStmt.getGeneratedKeys();
+
+ if (!rsetDriverId.next()) {
+ throw new SqoopException(DerbyRepoError.DERBYREPO_0013);
+ }
+ return rsetDriverId.getLong(1);
+ } catch (SQLException ex) {
+ throw new SqoopException(DerbyRepoError.DERBYREPO_0050, mDriver.toString(), ex);
+ } finally {
+ closeStatements(baseDriverStmt);
+ }
+ }
+
/**
* {@inheritDoc}
*/
@@ -351,59 +378,6 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
}
/**
- * Detect version of the driver
- *
- * @param conn Connection to the repository
- * @return Version of the Driver
- */
- private String detectDriverVersion (Connection conn) {
- ResultSet rs = null;
- PreparedStatement stmt = null;
- try {
- stmt = conn.prepareStatement(DerbySchemaQuery.STMT_SELECT_SYSTEM);
- stmt.setString(1, DerbyRepoConstants.SYSKEY_DRIVER_CONFIG_VERSION);
- rs = stmt.executeQuery();
- if(!rs.next()) {
- return null;
- }
- return rs.getString(1);
- } catch (SQLException e) {
- LOG.info("Can't fetch driver version.", e);
- return null;
- } finally {
- closeResultSets(rs);
- closeStatements(stmt);
- }
- }
-
- /**
- * Create or update driver version
- * @param conn Connection to the the repository
- * @param mDriver
- */
- private void createOrUpdateDriverSystemVersion(Connection conn, String version) {
- ResultSet rs = null;
- PreparedStatement stmt = null;
- try {
- stmt = conn.prepareStatement(STMT_DELETE_SYSTEM);
- stmt.setString(1, DerbyRepoConstants.SYSKEY_DRIVER_CONFIG_VERSION);
- stmt.executeUpdate();
- closeStatements(stmt);
-
- stmt = conn.prepareStatement(STMT_INSERT_SYSTEM);
- stmt.setString(1, DerbyRepoConstants.SYSKEY_DRIVER_CONFIG_VERSION);
- stmt.setString(2, version);
- stmt.executeUpdate();
- } catch (SQLException e) {
- logException(e);
- throw new SqoopException(DerbyRepoError.DERBYREPO_0044, e);
- } finally {
- closeResultSets(rs);
- closeStatements(stmt);
- }
- }
-
- /**
* {@inheritDoc}
*/
@Override
@@ -460,9 +434,11 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_REMOVE_COLUMN_SQB_TYPE, conn);
// SQOOP-1498 rename entities
- renameEntitiesForUpgrade(conn);
+ renameEntitiesForConnectionAndForm(conn);
// Change direction from VARCHAR to BIGINT + foreign key.
updateDirections(conn, insertDirections(conn));
+
+ renameConnectorToConfigurable(conn);
}
// Add unique constraints on job and links for version 4 onwards
if (repositoryVersion > 3) {
@@ -474,7 +450,7 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
}
// SQOOP-1498 refactoring related upgrades for table and column names
- void renameEntitiesForUpgrade(Connection conn) {
+ void renameEntitiesForConnectionAndForm(Connection conn) {
// LINK
// drop the constraint before rename
runQuery(QUERY_UPGRADE_DROP_TABLE_SQ_CONNECTION_CONSTRAINT_1, conn);
@@ -491,6 +467,10 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_CONNECTION_COLUMN_7, conn);
runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_CONNECTION_COLUMN_8, conn);
+ // rename constraints
+ runQuery(QUERY_UPGRADE_DROP_TABLE_SQ_CONNECTION_CONNECTOR_CONSTRAINT, conn);
+ runQuery(QUERY_UPGRADE_ADD_TABLE_SQ_LINK_CONNECTOR_CONSTRAINT, conn);
+
LOG.info("LINK TABLE altered");
// LINK_INPUT
@@ -511,6 +491,8 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_FORM_COLUMN_4, conn);
runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_FORM_COLUMN_5, conn);
runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_FORM_COLUMN_6, conn);
+ runQuery(QUERY_UPGRADE_DROP_TABLE_SQ_FORM_CONNECTOR_CONSTRAINT, conn);
+ runQuery(QUERY_UPGRADE_ADD_TABLE_SQ_CONFIG_CONNECTOR_CONSTRAINT, conn);
LOG.info("CONFIG TABLE altered");
@@ -528,7 +510,24 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
runQuery(QUERY_UPGRADE_ADD_TABLE_SQ_JOB_CONSTRAINT_TO, conn);
LOG.info("JOB TABLE altered and constraints added");
+ }
+
+ private void renameConnectorToConfigurable(Connection conn) {
+ // SQ_CONNECTOR to SQ_CONFIGURABLE upgrade
+ runQuery(QUERY_UPGRADE_DROP_TABLE_SQ_CONFIG_CONNECTOR_CONSTRAINT, conn);
+ runQuery(QUERY_UPGRADE_DROP_TABLE_SQ_LINK_CONSTRAINT, conn);
+ runQuery(QUERY_UPGRADE_DROP_TABLE_SQ_CONNECTOR_DIRECTION_CONSTRAINT, conn);
+
+ runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_CONNECTOR_TO_SQ_CONFIGURABLE, conn);
+ runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_CONFIG_COLUMN_1, conn);
+ runQuery(QUERY_UPGRADE_RENAME_TABLE_SQ_LINK_COLUMN_1, conn);
+ runQuery(QUERY_UPGRADE_TABLE_SQ_CONFIGURABLE_ADD_COLUMN_SQC_TYPE, conn);
+ runQuery(QUERY_UPGRADE_ADD_TABLE_SQ_CONFIG_CONFIGURABLE_CONSTRAINT, conn);
+ runQuery(QUERY_UPGRADE_ADD_TABLE_SQ_LINK_CONFIGURABLE_CONSTRAINT, conn);
+ runQuery(QUERY_UPGRADE_ADD_TABLE_SQ_CONNECTOR_DIRECTION_CONSTRAINT, conn);
+
+ LOG.info("CONNECTOR TABLE altered and constraints added for CONFIGURABLE");
}
private void upgradeRepositoryVersion(Connection conn) {
@@ -538,7 +537,6 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
runQuery(STMT_INSERT_SYSTEM, conn, DerbyRepoConstants.SYSKEY_DERBY_REPOSITORY_VERSION, ""
+ DerbyRepoConstants.LATEST_DERBY_REPOSITORY_VERSION);
}
-
/**
* Insert directions: FROM and TO.
* @param conn
@@ -643,6 +641,7 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
}
}
+
/**
* Upgrade job data from IMPORT/EXPORT to FROM/TO.
* Since the framework is no longer responsible for HDFS,
@@ -712,13 +711,13 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
runQuery(QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_DRIVER_INDEX, conn,
new Long(0), "throttling");
- Long linkId = createHdfsConnection(conn, connectorId);
+ Long connectionId = createHdfsConnection(conn, connectorId);
runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_UPDATE_SQB_TO_CONNECTION_COPY_SQB_FROM_CONNECTION, conn,
"EXPORT");
runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_UPDATE_SQB_FROM_CONNECTION, conn,
- new Long(linkId), "EXPORT");
+ new Long(connectionId), "EXPORT");
runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_UPDATE_SQB_TO_CONNECTION, conn,
- new Long(linkId), "IMPORT");
+ new Long(connectionId), "IMPORT");
runQuery(QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_SQF_NAME, conn,
"fromJobConfig", "table", Direction.FROM.toString());
@@ -738,6 +737,7 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
* Pre-register HDFS Connector so that config upgrade will work.
* NOTE: This should be used only in the upgrade path
*/
+ @Deprecated
protected long registerHdfsConnector(Connection conn) {
if (LOG.isTraceEnabled()) {
LOG.trace("Begin HDFS Connector pre-loading.");
@@ -760,7 +760,7 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
if (handler.getUniqueName().equals(CONNECTOR_HDFS)) {
try {
PreparedStatement baseConnectorStmt = conn.prepareStatement(
- STMT_INSERT_CONNECTOR_WITHOUT_SUPPORTED_DIRECTIONS,
+ STMT_INSERT_INTO_CONFIGURABLE_WITHOUT_SUPPORTED_DIRECTIONS,
Statement.RETURN_GENERATED_KEYS);
baseConnectorStmt.setString(1, handler.getConnectorConfigurable().getUniqueName());
baseConnectorStmt.setString(2, handler.getConnectorConfigurable().getClassName());
@@ -854,21 +854,20 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
LOG.debug("Looking up connector: " + shortName);
}
MConnector mc = null;
- PreparedStatement baseConnectorFetchStmt = null;
+ PreparedStatement connectorFetchStmt = null;
try {
- baseConnectorFetchStmt = conn.prepareStatement(STMT_FETCH_BASE_CONNECTOR);
- baseConnectorFetchStmt.setString(1, shortName);
+ connectorFetchStmt = conn.prepareStatement(STMT_SELECT_FROM_CONFIGURABLE);
+ connectorFetchStmt.setString(1, shortName);
- List<MConnector> connectors = loadConnectors(baseConnectorFetchStmt, conn);
+ List<MConnector> connectors = loadConnectors(connectorFetchStmt, conn);
- if (connectors.size()==0) {
+ if (connectors.size() == 0) {
LOG.debug("No connector found by name: " + shortName);
return null;
- } else if (connectors.size()==1) {
+ } else if (connectors.size() == 1) {
LOG.debug("Looking up connector: " + shortName + ", found: " + mc);
return connectors.get(0);
- }
- else {
+ } else {
throw new SqoopException(DerbyRepoError.DERBYREPO_0005, shortName);
}
@@ -876,7 +875,7 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
logException(ex, shortName);
throw new SqoopException(DerbyRepoError.DERBYREPO_0004, shortName, ex);
} finally {
- closeStatements(baseConnectorFetchStmt);
+ closeStatements(connectorFetchStmt);
}
}
@@ -887,7 +886,9 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
public List<MConnector> findConnectors(Connection conn) {
PreparedStatement stmt = null;
try {
- stmt = conn.prepareStatement(STMT_SELECT_CONNECTOR_ALL);
+ stmt = conn.prepareStatement(STMT_SELECT_CONFIGURABLE_ALL_FOR_TYPE);
+ stmt.setString(1, MConfigurableType.CONNECTOR.name());
+
return loadConnectors(stmt,conn);
} catch (SQLException ex) {
logException(ex);
@@ -897,84 +898,101 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
}
}
-
/**
* {@inheritDoc}
*/
@Override
public void registerDriver(MDriver mDriver, Connection conn) {
if (mDriver.hasPersistenceId()) {
- throw new SqoopException(DerbyRepoError.DERBYREPO_0011,
- "Driver");
+ throw new SqoopException(DerbyRepoError.DERBYREPO_0011, mDriver.getUniqueName());
}
+ mDriver.setPersistenceId(insertAndGetDriverId(mDriver, conn));
+ insertConfigsforDriver(mDriver, conn);
+ }
+ private void insertConfigsforDriver(MDriver mDriver, Connection conn) {
PreparedStatement baseConfigStmt = null;
PreparedStatement baseInputStmt = null;
try {
- baseConfigStmt = conn.prepareStatement(STMT_INSERT_CONFIG_BASE,
+ baseConfigStmt = conn.prepareStatement(STMT_INSERT_INTO_CONFIG,
Statement.RETURN_GENERATED_KEYS);
- baseInputStmt = conn.prepareStatement(STMT_INSERT_INPUT_BASE,
+ baseInputStmt = conn.prepareStatement(STMT_INSERT_INTO_INPUT,
Statement.RETURN_GENERATED_KEYS);
// Register a driver config as a job type with no owner/connector and direction
- registerConfigs(null/* owner*/, null /*direction*/, mDriver.getDriverConfig().getConfigs(),
+ registerConfigs(mDriver.getPersistenceId(), null /* no direction*/, mDriver.getDriverConfig().getConfigs(),
MConfigType.JOB.name(), baseConfigStmt, baseInputStmt, conn);
- // We're using hardcoded value for driver config as they are
- // represented as NULL in the database.
- mDriver.setPersistenceId(1);
} catch (SQLException ex) {
logException(ex, mDriver);
throw new SqoopException(DerbyRepoError.DERBYREPO_0014, ex);
} finally {
closeStatements(baseConfigStmt, baseInputStmt);
}
- createOrUpdateDriverSystemVersion(conn, mDriver.getVersion());
}
/**
* {@inheritDoc}
*/
@Override
- public MDriver findDriver(Connection conn) {
- LOG.debug("Looking up Driver config to create a driver ");
- MDriver mDriver = null;
+ public MDriver findDriver(String shortName, Connection conn) {
+ LOG.debug("Looking up Driver and config ");
+ PreparedStatement driverFetchStmt = null;
PreparedStatement driverConfigFetchStmt = null;
PreparedStatement driverConfigInputFetchStmt = null;
+
+ MDriver mDriver;
try {
- driverConfigFetchStmt = conn.prepareStatement(STMT_FETCH_CONFIG_DRIVER);
- driverConfigInputFetchStmt = conn.prepareStatement(STMT_FETCH_INPUT);
+ driverFetchStmt = conn.prepareStatement(STMT_SELECT_FROM_CONFIGURABLE);
+ driverFetchStmt.setString(1, shortName);
+
+ ResultSet rsDriverSet = driverFetchStmt.executeQuery();
+ if (!rsDriverSet.next()) {
+ return null;
+ }
+ Long driverId = rsDriverSet.getLong(1);
+ String driverVersion = rsDriverSet.getString(4);
+
+ driverConfigFetchStmt = conn.prepareStatement(STMT_SELECT_CONFIG_FOR_CONFIGURABLE);
+ driverConfigFetchStmt.setLong(1, driverId);
+
+ driverConfigInputFetchStmt = conn.prepareStatement(STMT_SELECT_INPUT);
List<MConfig> driverConfigs = new ArrayList<MConfig>();
loadDriverConfigs(driverConfigs, driverConfigFetchStmt, driverConfigInputFetchStmt, 1);
- if(driverConfigs.isEmpty()) {
+ if (driverConfigs.isEmpty()) {
return null;
}
-
- mDriver = new MDriver(new MDriverConfig(driverConfigs), detectDriverVersion(conn));
- mDriver.setPersistenceId(1);
+ mDriver = new MDriver(new MDriverConfig(driverConfigs), driverVersion);
+ mDriver.setPersistenceId(driverId);
} catch (SQLException ex) {
- throw new SqoopException(DerbyRepoError.DERBYREPO_0004,
- "Driver config", ex);
+ throw new SqoopException(DerbyRepoError.DERBYREPO_0004, "Driver", ex);
} finally {
if (driverConfigFetchStmt != null) {
try {
driverConfigFetchStmt.close();
} catch (SQLException ex) {
- LOG.error("Unable to close config fetch statement", ex);
+ LOG.error("Unable to close driver config fetch statement", ex);
}
}
if (driverConfigInputFetchStmt != null) {
try {
driverConfigInputFetchStmt.close();
} catch (SQLException ex) {
- LOG.error("Unable to close input fetch statement", ex);
+ LOG.error("Unable to close driver input fetch statement", ex);
+ }
+ }
+ if (driverFetchStmt != null) {
+ try {
+ driverFetchStmt.close();
+ } catch (SQLException ex) {
+ LOG.error("Unable to close driver fetch statement", ex);
}
}
}
- LOG.debug("Looking up Driver config and created driver:" + mDriver);
+ LOG.debug("Looked up Driver and config");
return mDriver;
}
@@ -1228,7 +1246,7 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
public List<MLink> findLinksForConnector(long connectorID, Connection conn) {
PreparedStatement stmt = null;
try {
- stmt = conn.prepareStatement(STMT_SELECT_LINK_FOR_CONNECTOR);
+ stmt = conn.prepareStatement(STMT_SELECT_LINK_FOR_CONNECTOR_CONFIGURABLE);
stmt.setLong(1, connectorID);
return loadLinks(stmt, conn);
} catch (SQLException ex) {
@@ -1243,7 +1261,7 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
* {@inheritDoc}
*/
@Override
- public void upgradeConnectorConfigs(MConnector mConnector, Connection conn) {
+ public void upgradeConnectorAndConfigs(MConnector mConnector, Connection conn) {
updateConnectorAndDeleteConfigs(mConnector, conn);
insertConfigsForConnector(mConnector, conn);
}
@@ -1253,13 +1271,14 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
PreparedStatement deleteConfig = null;
PreparedStatement deleteInput = null;
try {
- updateConnectorStatement = conn.prepareStatement(STMT_UPDATE_CONNECTOR);
- deleteInput = conn.prepareStatement(STMT_DELETE_INPUTS_FOR_CONNECTOR);
- deleteConfig = conn.prepareStatement(STMT_DELETE_CONFIGS_FOR_CONNECTOR);
+ updateConnectorStatement = conn.prepareStatement(STMT_UPDATE_CONFIGURABLE);
+ deleteInput = conn.prepareStatement(STMT_DELETE_INPUTS_FOR_CONFIGURABLE);
+ deleteConfig = conn.prepareStatement(STMT_DELETE_CONFIGS_FOR_CONFIGURABLE);
updateConnectorStatement.setString(1, mConnector.getUniqueName());
updateConnectorStatement.setString(2, mConnector.getClassName());
updateConnectorStatement.setString(3, mConnector.getVersion());
- updateConnectorStatement.setLong(4, mConnector.getPersistenceId());
+ updateConnectorStatement.setString(4, mConnector.getType().name());
+ updateConnectorStatement.setLong(5, mConnector.getPersistenceId());
if (updateConnectorStatement.executeUpdate() != 1) {
throw new SqoopException(DerbyRepoError.DERBYREPO_0038);
@@ -1281,19 +1300,30 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
* {@inheritDoc}
*/
@Override
- public void upgradeDriverConfigs(MDriver mDriver, Connection conn) {
+ public void upgradeDriverAndConfigs(MDriver mDriver, Connection conn) {
updateDriverAndDeleteConfigs(mDriver, conn);
- createOrUpdateDriverSystemVersion(conn, mDriver.getVersion());
insertConfigsForDriver(mDriver, conn);
}
private void updateDriverAndDeleteConfigs(MDriver mDriver, Connection conn) {
+ PreparedStatement updateDriverStatement = null;
PreparedStatement deleteConfig = null;
PreparedStatement deleteInput = null;
try {
- deleteInput = conn.prepareStatement(STMT_DELETE_DRIVER_INPUTS);
- deleteConfig = conn.prepareStatement(STMT_DELETE_DRIVER_CONFIGS);
-
+ updateDriverStatement = conn.prepareStatement(STMT_UPDATE_CONFIGURABLE);
+ deleteInput = conn.prepareStatement(STMT_DELETE_INPUTS_FOR_CONFIGURABLE);
+ deleteConfig = conn.prepareStatement(STMT_DELETE_CONFIGS_FOR_CONFIGURABLE);
+ updateDriverStatement.setString(1, mDriver.getUniqueName());
+ updateDriverStatement.setString(2, Driver.getClassName());
+ updateDriverStatement.setString(3, mDriver.getVersion());
+ updateDriverStatement.setString(4, mDriver.getType().name());
+ updateDriverStatement.setLong(5, mDriver.getPersistenceId());
+
+ if (updateDriverStatement.executeUpdate() != 1) {
+ throw new SqoopException(DerbyRepoError.DERBYREPO_0038);
+ }
+ deleteInput.setLong(1, mDriver.getPersistenceId());
+ deleteConfig.setLong(1, mDriver.getPersistenceId());
deleteInput.executeUpdate();
deleteConfig.executeUpdate();
@@ -1301,7 +1331,7 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
logException(e, mDriver);
throw new SqoopException(DerbyRepoError.DERBYREPO_0044, e);
} finally {
- closeStatements(deleteConfig, deleteInput);
+ closeStatements(updateDriverStatement, deleteConfig, deleteInput);
}
}
@@ -1557,7 +1587,7 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
public List<MJob> findJobsForConnector(long connectorId, Connection conn) {
PreparedStatement stmt = null;
try {
- stmt = conn.prepareStatement(STMT_SELECT_ALL_JOBS_FOR_CONNECTOR);
+ stmt = conn.prepareStatement(STMT_SELECT_ALL_JOBS_FOR_CONNECTOR_CONFIGURABLE);
stmt.setLong(1, connectorId);
stmt.setLong(2, connectorId);
return loadJobs(stmt, conn);
@@ -2080,8 +2110,8 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
try {
rsConnectors = stmt.executeQuery();
- connectorConfigFetchStmt = conn.prepareStatement(STMT_FETCH_CONFIG_CONNECTOR);
- connectorConfigInputFetchStmt = conn.prepareStatement(STMT_FETCH_INPUT);
+ connectorConfigFetchStmt = conn.prepareStatement(STMT_SELECT_CONFIG_FOR_CONFIGURABLE);
+ connectorConfigInputFetchStmt = conn.prepareStatement(STMT_SELECT_INPUT);
while(rsConnectors.next()) {
long connectorId = rsConnectors.getLong(1);
@@ -2116,9 +2146,8 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
}
} finally {
closeResultSets(rsConnectors);
- closeStatements(connectorConfigFetchStmt,connectorConfigInputFetchStmt);
+ closeStatements(connectorConfigFetchStmt, connectorConfigInputFetchStmt);
}
-
return connectors;
}
@@ -2134,7 +2163,7 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
rsConnection = stmt.executeQuery();
//
- connectorConfigFetchStatement = conn.prepareStatement(STMT_FETCH_CONFIG_CONNECTOR);
+ connectorConfigFetchStatement = conn.prepareStatement(STMT_SELECT_CONFIG_FOR_CONFIGURABLE);
connectorConfigInputStatement = conn.prepareStatement(STMT_FETCH_LINK_INPUT);
while(rsConnection.next()) {
@@ -2189,14 +2218,15 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
try {
rsJob = stmt.executeQuery();
-
- fromConfigFetchStmt = conn.prepareStatement(STMT_FETCH_CONFIG_CONNECTOR);
- toConfigFetchStmt = conn.prepareStatement(STMT_FETCH_CONFIG_CONNECTOR);
- driverConfigfetchStmt = conn.prepareStatement(STMT_FETCH_CONFIG_DRIVER);
+ // Note: Job does not hold a explicit reference to the driver since every
+ // job has the same driver
+ long driverId = this.findDriver(MDriver.DRIVER_NAME, conn).getPersistenceId();
+ fromConfigFetchStmt = conn.prepareStatement(STMT_SELECT_CONFIG_FOR_CONFIGURABLE);
+ toConfigFetchStmt = conn.prepareStatement(STMT_SELECT_CONFIG_FOR_CONFIGURABLE);
+ driverConfigfetchStmt = conn.prepareStatement(STMT_SELECT_CONFIG_FOR_CONFIGURABLE);
jobInputFetchStmt = conn.prepareStatement(STMT_FETCH_JOB_INPUT);
while(rsJob.next()) {
- // why use connector? why cant it be link id?
long fromConnectorId = rsJob.getLong(1);
long toConnectorId = rsJob.getLong(2);
long id = rsJob.getLong(3);
@@ -2211,9 +2241,9 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
fromConfigFetchStmt.setLong(1, fromConnectorId);
toConfigFetchStmt.setLong(1,toConnectorId);
+ driverConfigfetchStmt.setLong(1, driverId);
jobInputFetchStmt.setLong(1, id);
- //inputFetchStmt.setLong(1, XXX); // Will be filled by loadFrameworkConfigs
jobInputFetchStmt.setLong(3, id);
// FROM entity configs
@@ -2283,7 +2313,7 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
*
* Use given prepared statements to create entire config structure in database.
*
- * @param connectorId
+ * @param configurableId
* @param configs
* @param type
* @param baseConfigStmt
@@ -2292,17 +2322,17 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
* @return short number of configs registered.
* @throws SQLException
*/
- private short registerConfigs(Long connectorId, Direction direction,
+ private short registerConfigs(Long configurableId, Direction direction,
List<MConfig> configs, String type, PreparedStatement baseConfigStmt,
PreparedStatement baseInputStmt, Connection conn)
throws SQLException {
short configIndex = 0;
for (MConfig config : configs) {
- if(connectorId == null) {
+ if (configurableId == null) {
baseConfigStmt.setNull(1, Types.BIGINT);
} else {
- baseConfigStmt.setLong(1, connectorId);
+ baseConfigStmt.setLong(1, configurableId);
}
baseConfigStmt.setString(2, config.getName());
http://git-wip-us.apache.org/repos/asf/sqoop/blob/151a0a12/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaConstants.java
----------------------------------------------------------------------
diff --git a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaConstants.java b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaConstants.java
index 59773e1..de08261 100644
--- a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaConstants.java
+++ b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaConstants.java
@@ -49,9 +49,14 @@ public final class DerbySchemaConstants {
public static final String COLUMN_SQD_NAME = "SQD_NAME";
// SQ_CONNECTOR
+ @Deprecated // used only for upgrade
public static final String TABLE_SQ_CONNECTOR_NAME = "SQ_CONNECTOR";
+ // SQ_CONFIGURABLE
+ public static final String TABLE_SQ_CONFIGURABLE_NAME = "SQ_CONFIGURABLE";
+ @Deprecated // used only for upgrade
public static final String TABLE_SQ_CONNECTOR = SCHEMA_PREFIX + TABLE_SQ_CONNECTOR_NAME;
+ public static final String TABLE_SQ_CONFIGURABLE = SCHEMA_PREFIX + TABLE_SQ_CONFIGURABLE_NAME;
public static final String COLUMN_SQC_ID = "SQC_ID";
@@ -61,6 +66,8 @@ public final class DerbySchemaConstants {
public static final String COLUMN_SQC_VERSION = "SQC_VERSION";
+ public static final String COLUMN_SQC_TYPE = "SQC_TYPE";
+
// SQ_CONNECTOR_DIRECTIONS
public static final String TABLE_SQ_CONNECTOR_DIRECTIONS_NAME = "SQ_CONNECTOR_DIRECTIONS";
@@ -75,12 +82,10 @@ public final class DerbySchemaConstants {
public static final String COLUMN_SQCD_DIRECTION = "SQCD_DIRECTION";
public static final String CONSTRAINT_SQCD_SQC_NAME = CONSTRAINT_PREFIX + "SQCD_SQC";
-
// FK to the SQ_CONNECTOR table
public static final String CONSTRAINT_SQCD_SQC = SCHEMA_PREFIX + CONSTRAINT_SQCD_SQC_NAME;
public static final String CONSTRAINT_SQCD_SQD_NAME = CONSTRAINT_PREFIX + "SQCD_SQD";
-
// FK to the SQ_DIRECTION able
public static final String CONSTRAINT_SQCD_SQD = SCHEMA_PREFIX + CONSTRAINT_SQCD_SQD_NAME;
@@ -99,7 +104,10 @@ public final class DerbySchemaConstants {
@Deprecated // used only for upgrade
public static final String COLUMN_SQF_CONNECTOR = "SQF_CONNECTOR";
+ @Deprecated // used only for upgrade path
public static final String COLUMN_SQ_CFG_CONNECTOR = "SQ_CFG_CONNECTOR";
+ // note this column was renamed again
+ public static final String COLUMN_SQ_CFG_CONFIGURABLE = "SQ_CFG_CONFIGURABLE";
@Deprecated // used only for upgrade
public static final String COLUMN_SQF_OPERATION = "SQF_OPERATION";
@@ -125,8 +133,8 @@ public final class DerbySchemaConstants {
@Deprecated // used only for upgrade
public static final String CONSTRAINT_SQF_SQC = SCHEMA_PREFIX + CONSTRAINT_SQF_SQC_NAME;
+ // FK constraint on configurable
public static final String CONSTRAINT_SQ_CFG_SQC_NAME = CONSTRAINT_PREFIX + "SQ_CFG_SQC";
-
public static final String CONSTRAINT_SQ_CFG_SQC = SCHEMA_PREFIX + CONSTRAINT_SQ_CFG_SQC_NAME;
// SQ_CONFIG_DIRECTIONS
@@ -202,7 +210,11 @@ public final class DerbySchemaConstants {
public static final String COLUMN_SQ_LNK_NAME = "SQ_LNK_NAME";
@Deprecated // used only for upgrade
public static final String COLUMN_SQN_CONNECTOR = "SQN_CONNECTOR";
+ @Deprecated // used only for upgrade
public static final String COLUMN_SQ_LNK_CONNECTOR = "SQ_LNK_CONNECTOR";
+ // Note this column has been renamed twice
+ public static final String COLUMN_SQ_LNK_CONFIGURABLE = "SQ_LNK_CONFIGURABLE";
+
@Deprecated // used only for upgrade
public static final String COLUMN_SQN_CREATION_USER = "SQN_CREATION_USER";
public static final String COLUMN_SQ_LNK_CREATION_USER = "SQ_LNK_CREATION_USER";
@@ -225,10 +237,10 @@ public final class DerbySchemaConstants {
@Deprecated
public static final String CONSTRAINT_SQN_SQC = SCHEMA_PREFIX + CONSTRAINT_SQN_SQC_NAME;
+ // FK constraint on the connector configurable
public static final String CONSTRAINT_SQ_LNK_SQC = SCHEMA_PREFIX + CONSTRAINT_SQ_LNK_SQC_NAME;
public static final String CONSTRAINT_SQ_LNK_NAME_UNIQUE_NAME = CONSTRAINT_PREFIX + "SQ_LNK_NAME_UNIQUE";
-
public static final String CONSTRAINT_SQ_LNK_NAME_UNIQUE = SCHEMA_PREFIX + CONSTRAINT_SQ_LNK_NAME_UNIQUE_NAME;
// SQ_JOB
@@ -437,12 +449,12 @@ public final class DerbySchemaConstants {
static {
tablesV1 = new HashSet<String>();
tablesV1.add(TABLE_SQ_CONNECTOR_NAME);
- tablesV1.add(TABLE_SQ_LINK_NAME);
- tablesV1.add(TABLE_SQ_LINK_INPUT_NAME);
+ tablesV1.add(TABLE_SQ_CONNECTION_NAME);
+ tablesV1.add(TABLE_SQ_CONNECTION_INPUT_NAME);
tablesV1.add(TABLE_SQ_COUNTER_NAME);
tablesV1.add(TABLE_SQ_COUNTER_GROUP_NAME);
tablesV1.add(TABLE_SQ_COUNTER_SUBMISSION_NAME);
- tablesV1.add(TABLE_SQ_CONFIG_NAME);
+ tablesV1.add(TABLE_SQ_FORM_NAME);
tablesV1.add(TABLE_SQ_INPUT_NAME);
tablesV1.add(TABLE_SQ_JOB_NAME);
tablesV1.add(TABLE_SQ_JOB_INPUT_NAME);