You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ab...@apache.org on 2015/02/25 22:57:39 UTC

sqoop git commit: SQOOP-1516: Sqoop2: Config Input as a Top Level Entity - RepositoryAPI changes

Repository: sqoop
Updated Branches:
  refs/heads/sqoop2 410ee0462 -> 61dfb6db4


SQOOP-1516: Sqoop2: Config Input as a Top Level Entity - RepositoryAPI changes

(Veena Basavaraj via Abraham Elmahrek)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/61dfb6db
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/61dfb6db
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/61dfb6db

Branch: refs/heads/sqoop2
Commit: 61dfb6db45d7f4ab7693b13d535626c6d4f2dd74
Parents: 410ee04
Author: Abraham Elmahrek <ab...@apache.org>
Authored: Wed Feb 25 13:56:56 2015 -0800
Committer: Abraham Elmahrek <ab...@apache.org>
Committed: Wed Feb 25 13:56:56 2015 -0800

----------------------------------------------------------------------
 .../sqoop/error/code/CommonRepositoryError.java |  16 ++
 .../org/apache/sqoop/model/MConfigType.java     |  26 ++-
 .../sqoop/model/MConfigUpdateEntityType.java    |  34 ++++
 .../main/java/org/apache/sqoop/model/MLink.java |   4 +-
 .../java/org/apache/sqoop/model/MMapInput.java  |   2 +-
 .../apache/sqoop/repository/JdbcRepository.java | 117 +++++++++++++-
 .../sqoop/repository/JdbcRepositoryHandler.java |  62 +++++++-
 .../org/apache/sqoop/repository/Repository.java |  75 +++++++++
 .../common/CommonRepositoryHandler.java         | 137 ++++++++++++++--
 ...RepositoryInsertUpdateDeleteSelectQuery.java |  23 +++
 .../sqoop/repository/derby/DerbyTestCase.java   |   6 +-
 .../sqoop/repository/derby/TestJobHandling.java | 159 +++++++++++++++++--
 .../repository/derby/TestLinkHandling.java      |  44 +++++
 13 files changed, 673 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/61dfb6db/common/src/main/java/org/apache/sqoop/error/code/CommonRepositoryError.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/error/code/CommonRepositoryError.java b/common/src/main/java/org/apache/sqoop/error/code/CommonRepositoryError.java
index 74e5de3..7db31dd 100644
--- a/common/src/main/java/org/apache/sqoop/error/code/CommonRepositoryError.java
+++ b/common/src/main/java/org/apache/sqoop/error/code/CommonRepositoryError.java
@@ -196,6 +196,22 @@ public enum CommonRepositoryError implements ErrorCode {
 
   COMMON_0048("Config Input overrides could not be fetched"),
 
+  COMMON_0049("Unable to fetch FROM job config"),
+
+  COMMON_0050("Unable to fetch TO job config"),
+
+  COMMON_0051("Unable to fetch DRIVER job config"),
+
+  COMMON_0052("Unable to fetch LINK config"),
+
+  COMMON_0053("Unable to update job config"),
+
+  COMMON_0054("Unable to update link config"),
+
+  COMMON_0055("Unable to update CONNECTOR_ONLY editable config"),
+
+  COMMON_0056("Unable to update  USER_ONLY editable config"),
+
   ;
 
   private final String message;

http://git-wip-us.apache.org/repos/asf/sqoop/blob/61dfb6db/common/src/main/java/org/apache/sqoop/model/MConfigType.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/model/MConfigType.java b/common/src/main/java/org/apache/sqoop/model/MConfigType.java
index fa29d5a..a8602fc 100644
--- a/common/src/main/java/org/apache/sqoop/model/MConfigType.java
+++ b/common/src/main/java/org/apache/sqoop/model/MConfigType.java
@@ -20,6 +20,11 @@ package org.apache.sqoop.model;
 import org.apache.sqoop.classification.InterfaceAudience;
 import org.apache.sqoop.classification.InterfaceStability;
 
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
 /**
  * Represents the various config types supported by the system.
  */
@@ -35,9 +40,26 @@ public enum MConfigType {
   CONNECTION,
 
   /** link config type */
-  LINK,
+  LINK("link"),
 
   /** Job config type */
-  JOB;
+  // NOTE: cannot use the constants declared below since it is not declared yet
+  // compiler restriction
+  JOB("from", "to", "driver");
+
+  private final Set<String> subTypes;
+
+  MConfigType(String... subTypes) {
+    Set<String> subT = new HashSet<String>();
+    subT.addAll(Arrays.asList(subTypes));
+    this.subTypes = Collections.unmodifiableSet(subT);
+  }
+
+  public static Set<String> getSubTypes(MConfigType type) {
+    return type.subTypes;
+  }
 
+  public static final String FROM_SUB_TYPE = "from";
+  public static final String TO_SUB_TYPE = "to";
+  public static final String DRIVER_SUB_TYPE = "driver";
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/61dfb6db/common/src/main/java/org/apache/sqoop/model/MConfigUpdateEntityType.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/model/MConfigUpdateEntityType.java b/common/src/main/java/org/apache/sqoop/model/MConfigUpdateEntityType.java
new file mode 100644
index 0000000..6847804
--- /dev/null
+++ b/common/src/main/java/org/apache/sqoop/model/MConfigUpdateEntityType.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.model;
+
+/**
+ * Represents the entities that can edit the config input values.
+ *
+ */
+public enum MConfigUpdateEntityType {
+
+  /** update config values via rest API or command line */
+  USER,
+
+  /** update config values via connector upgrade tool */
+  CONNECTOR,
+
+  ;
+
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/61dfb6db/common/src/main/java/org/apache/sqoop/model/MLink.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/model/MLink.java b/common/src/main/java/org/apache/sqoop/model/MLink.java
index 062a4c5..01ff5df 100644
--- a/common/src/main/java/org/apache/sqoop/model/MLink.java
+++ b/common/src/main/java/org/apache/sqoop/model/MLink.java
@@ -81,8 +81,8 @@ public class MLink extends MAccountableEntity implements MClonable {
   public MLinkConfig getConnectorLinkConfig() {
     return connectorLinkConfig;
   }
-  public MConfig getConnectorLinkConfig(String formName) {
-    return connectorLinkConfig.getConfig(formName);
+  public MConfig getConnectorLinkConfig(String configName) {
+    return connectorLinkConfig.getConfig(configName);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/sqoop/blob/61dfb6db/common/src/main/java/org/apache/sqoop/model/MMapInput.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/model/MMapInput.java b/common/src/main/java/org/apache/sqoop/model/MMapInput.java
index ce0f0f7..973b5fa 100644
--- a/common/src/main/java/org/apache/sqoop/model/MMapInput.java
+++ b/common/src/main/java/org/apache/sqoop/model/MMapInput.java
@@ -48,7 +48,7 @@ public final class MMapInput extends MInput<Map<String, String>> {
         vsb.append("&");
       }
       vsb.append(UrlSafeUtils.urlEncode(entry.getKey())).append("=");
-      vsb.append(UrlSafeUtils.urlEncode(entry.getValue()));
+      vsb.append(entry.getValue() != null ? UrlSafeUtils.urlEncode(entry.getValue()): null);
     }
     return vsb.toString();
   }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/61dfb6db/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java b/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java
index 091c3ca..9c5e15e 100644
--- a/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java
+++ b/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java
@@ -23,8 +23,11 @@ import java.util.List;
 
 import org.apache.log4j.Logger;
 import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.model.MConfig;
+import org.apache.sqoop.model.MConfigUpdateEntityType;
 import org.apache.sqoop.model.MConnector;
 import org.apache.sqoop.model.MDriver;
+import org.apache.sqoop.model.MInput;
 import org.apache.sqoop.model.MJob;
 import org.apache.sqoop.model.MLink;
 import org.apache.sqoop.model.MSubmission;
@@ -657,6 +660,119 @@ public class JdbcRepository extends Repository {
     });
   }
 
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public MConfig findFromJobConfig(final long jobId, final String configName) {
+    return (MConfig) doWithConnection(new DoWithConnection() {
+      @Override
+      public Object doIt(Connection conn) {
+        if (!handler.existsJob(jobId, conn)) {
+          throw new SqoopException(RepositoryError.JDBCREPO_0020, "Invalid id: " + jobId);
+        }
+        return handler.findFromJobConfig(jobId, configName, conn);
+      }
+    });
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public MConfig findToJobConfig(final long jobId, final String configName) {
+    return (MConfig) doWithConnection(new DoWithConnection() {
+      @Override
+      public Object doIt(Connection conn) {
+        if (!handler.existsJob(jobId, conn)) {
+          throw new SqoopException(RepositoryError.JDBCREPO_0020, "Invalid id: " + jobId);
+        }
+        return handler.findToJobConfig(jobId, configName, conn);
+      }
+    });
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public MConfig findDriverJobConfig(final long jobId, final String configName) {
+    return (MConfig) doWithConnection(new DoWithConnection() {
+      @Override
+      public Object doIt(Connection conn) {
+        if (!handler.existsJob(jobId, conn)) {
+          throw new SqoopException(RepositoryError.JDBCREPO_0020, "Invalid id: " + jobId);
+        }
+        return handler.findDriverJobConfig(jobId, configName, conn);
+      }
+    });
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public MConfig findLinkConfig(final long linkId, final String configName) {
+    return (MConfig) doWithConnection(new DoWithConnection() {
+      @Override
+      public Object doIt(Connection conn) {
+        if (!handler.existsLink(linkId, conn)) {
+          throw new SqoopException(RepositoryError.JDBCREPO_0017, "Invalid id: " + linkId);
+        }
+        return handler.findLinkConfig(linkId, configName, conn);
+      }
+    });
+  }
+
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void updateJobConfig(final long jobId, final MConfig config, final MConfigUpdateEntityType type) {
+    updateJobConfig(jobId, config, null);
+  }
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void updateJobConfig(final long jobId, final MConfig config, final MConfigUpdateEntityType type, RepositoryTransaction tx) {
+    doWithConnection(new DoWithConnection() {
+      @Override
+      public Object doIt(Connection conn) {
+        if (!handler.existsJob(jobId, conn)) {
+          throw new SqoopException(RepositoryError.JDBCREPO_0020, "Invalid id: " + jobId);
+        }
+        handler.updateJobConfig(jobId, config, type, conn);
+        return null;
+      }
+    }, (JdbcRepositoryTransaction) tx);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void updateLinkConfig(final long linkId, final MConfig config, final MConfigUpdateEntityType type) {
+    updateLinkConfig(linkId, config, type, null);
+  }
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void updateLinkConfig(final long linkId, final MConfig config, final MConfigUpdateEntityType type, RepositoryTransaction tx) {
+    doWithConnection(new DoWithConnection() {
+      @Override
+      public Object doIt(Connection conn) {
+        if (!handler.existsLink(linkId, conn)) {
+          throw new SqoopException(RepositoryError.JDBCREPO_0017, "Invalid id: " + linkId);
+        }
+        handler.updateLinkConfig(linkId, config, type,  conn);
+        return null;
+      }
+    }, (JdbcRepositoryTransaction) tx);
+  }
+
   @Override
   protected void deleteJobInputs(final long jobID, RepositoryTransaction tx) {
     doWithConnection(new DoWithConnection() {
@@ -666,7 +782,6 @@ public class JdbcRepository extends Repository {
         return null;
       }
     }, (JdbcRepositoryTransaction) tx);
-
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/sqoop/blob/61dfb6db/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java b/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java
index f4b1374..f690887 100644
--- a/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java
+++ b/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java
@@ -21,6 +21,8 @@ import java.sql.Connection;
 import java.util.Date;
 import java.util.List;
 
+import org.apache.sqoop.model.MConfig;
+import org.apache.sqoop.model.MConfigUpdateEntityType;
 import org.apache.sqoop.model.MConnector;
 import org.apache.sqoop.model.MDriver;
 import org.apache.sqoop.model.MJob;
@@ -416,4 +418,62 @@ public abstract class JdbcRepositoryHandler {
    * @return Most recent submission
    */
   public abstract MSubmission findLastSubmissionForJob(long jobId, Connection conn);
-}
\ No newline at end of file
+
+  /**
+   * fetch the job config for the FROM type for the given name
+   * @param jobId id of the job
+   * @param configName name of the config unique to this job and type
+   * @param conn Connection to the repository
+   * @return config object
+   */
+   public abstract MConfig findFromJobConfig(long jobId, String configName, Connection con);
+
+
+   /**
+    * fetch the job config for the TO type for the given name
+    * @param jobId id of the job
+    * @param configName name of the config unique to this job and type
+    * @param conn Connection to the repository
+    * @return config object
+    */
+   public abstract MConfig findToJobConfig(long jobId, String configName, Connection con);
+
+
+   /**
+    * fetch the job config for the DRIVER type for the given name
+    * @param jobId id of the job
+    * @param configName name of the config unique to this job and type
+    * @param conn Connection to the repository
+    * @return config object
+    */
+   public abstract MConfig findDriverJobConfig(long jobId, String configName, Connection con);
+
+
+   /**
+    * fetch the link config for the link type for the given name
+    * @param linkId id of the link
+    * @param configName name of the config unique to this link and type
+    * @param conn Connection to the repository
+    * @return config object
+    */
+   public abstract MConfig findLinkConfig(long linkId, String configName, Connection con);
+
+   /**
+    * Update the config object for the job
+    * @param jobId id of the job
+    * @param config name of the config
+    * @param type entity type updating the link config
+    * @param conn Connection to the repository
+    */
+   public abstract void updateJobConfig(long jobId, MConfig config, MConfigUpdateEntityType type,  Connection con);
+
+   /**
+    * Update the config object for the link
+    * @param linkId id of the link
+    * @param config name of the config
+    * @param type entity type updating the link config
+    * @param conn Connection to the repository
+    */
+   public abstract void updateLinkConfig(long linkId, MConfig config, MConfigUpdateEntityType type, Connection con);
+
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/61dfb6db/core/src/main/java/org/apache/sqoop/repository/Repository.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/repository/Repository.java b/core/src/main/java/org/apache/sqoop/repository/Repository.java
index e07676a..aa91661 100644
--- a/core/src/main/java/org/apache/sqoop/repository/Repository.java
+++ b/core/src/main/java/org/apache/sqoop/repository/Repository.java
@@ -33,6 +33,7 @@ import org.apache.sqoop.driver.DriverUpgrader;
 import org.apache.sqoop.json.DriverBean;
 import org.apache.sqoop.model.ConfigUtils;
 import org.apache.sqoop.model.MConfig;
+import org.apache.sqoop.model.MConfigUpdateEntityType;
 import org.apache.sqoop.model.MConnector;
 import org.apache.sqoop.model.MDriver;
 import org.apache.sqoop.model.MDriverConfig;
@@ -326,6 +327,80 @@ public abstract class Repository {
    */
   public abstract MSubmission findLastSubmissionForJob(long jobId);
 
+ /**
+  * fetch the job config for the FROM type for the given name
+  * @param jobId id of the job
+  * @param configName name of the config unique to this job and type
+  * @return config object
+  */
+  public abstract MConfig findFromJobConfig(long jobId, String configName);
+
+
+  /**
+   * fetch the job config for the TO type for the given name
+   * @param jobId id of the job
+   * @param configName name of the config unique to this job and type
+   * @return config object
+   */
+  public abstract MConfig findToJobConfig(long jobId, String configName);
+
+
+  /**
+   * fetch the job config for the DRIVER type for the given name
+   * @param jobId id of the job
+   * @param configName name of the config unique to this job and type
+   * @return config object
+   */
+  public abstract MConfig findDriverJobConfig(long jobId, String configName);
+
+
+  /**
+   * fetch the link config for the link type for the given name
+   * @param linkId id of the link
+   * @param configName name of the config unique to this link and type
+   * @return config object
+   */
+  public abstract MConfig findLinkConfig(long linkId, String configName);
+
+
+  /**
+   * Update the config object for the job
+   * @param jobId id of the job
+   * @param config name of the config
+   * @param updateEntityType entity type updating the link config
+   */
+  public abstract void updateJobConfig(long jobId, MConfig config, MConfigUpdateEntityType updateEntityType);
+
+  /**
+   * Update the config object for the job
+   * @param jobId id of the job
+   * @param config name of the config
+   * @param updateEntityType entity type updating the link config
+   * @param tx database transaction
+   */
+  public abstract void updateJobConfig(long jobId, MConfig config, MConfigUpdateEntityType updateEntityType,  RepositoryTransaction tx);
+
+
+  /**
+   * Update the config object for the link
+   * @param linkId id of the link
+   * @param config name of the config
+   * @param updateEntityType entity type updating the link config
+   */
+  public abstract void updateLinkConfig(long linkId, MConfig config, MConfigUpdateEntityType updateEntityType);
+
+  /**
+   * Update the config object for the link
+   * @param linkId id of the link
+   * @param config name of the config
+   * @param updateEntityType entity type updating the link config
+   * @param tx database transaction
+   */
+  public abstract void updateLinkConfig(long linkId, MConfig config, MConfigUpdateEntityType updateEntityType, RepositoryTransaction tx);
+
+
+  /*********************Configurable Upgrade APIs ******************************/
+
   /**
    * Update the connector with the new data supplied in the
    * <tt>newConnector</tt>. Also Update all configs associated with this

http://git-wip-us.apache.org/repos/asf/sqoop/blob/61dfb6db/repository/repository-common/src/main/java/org/apache/sqoop/repository/common/CommonRepositoryHandler.java
----------------------------------------------------------------------
diff --git a/repository/repository-common/src/main/java/org/apache/sqoop/repository/common/CommonRepositoryHandler.java b/repository/repository-common/src/main/java/org/apache/sqoop/repository/common/CommonRepositoryHandler.java
index 6a23fc2..87d2d9c 100644
--- a/repository/repository-common/src/main/java/org/apache/sqoop/repository/common/CommonRepositoryHandler.java
+++ b/repository/repository-common/src/main/java/org/apache/sqoop/repository/common/CommonRepositoryHandler.java
@@ -26,6 +26,7 @@ import org.apache.sqoop.common.SupportedDirections;
 import org.apache.sqoop.driver.Driver;
 import org.apache.sqoop.error.code.CommonRepositoryError;
 import org.apache.sqoop.model.InputEditable;
+import org.apache.sqoop.model.MConfigUpdateEntityType;
 import org.apache.sqoop.model.MLongInput;
 import org.apache.sqoop.model.SubmissionError;
 import org.apache.sqoop.model.MBooleanInput;
@@ -1258,8 +1259,8 @@ public abstract class CommonRepositoryHandler extends JdbcRepositoryHandler {
           Statement.RETURN_GENERATED_KEYS);
 
       // Register link type config
-      registerConfigs(connectorId, null /* No direction for LINK type config*/, mc.getLinkConfig().getConfigs(),
-          MConfigType.LINK.name(), baseConfigStmt, baseInputStmt, conn);
+      registerConfigs(connectorId, null /* No direction for LINK type config */, mc.getLinkConfig()
+          .getConfigs(), MConfigType.LINK.name(), baseConfigStmt, baseInputStmt, conn);
 
       // Register both from/to job type config for connector
       if (mc.getSupportedDirections().isDirectionSupported(Direction.FROM)) {
@@ -1622,7 +1623,6 @@ public abstract class CommonRepositoryHandler extends JdbcRepositoryHandler {
     try {
       rsConnection = stmt.executeQuery();
 
-      //
       connectorConfigFetchStatement = conn.prepareStatement(crudQueries.getStmtSelectConfigForConfigurable());
       connectorConfigInputStatement = conn.prepareStatement(crudQueries.getStmtFetchLinkInput());
 
@@ -1719,7 +1719,6 @@ public abstract class CommonRepositoryHandler extends JdbcRepositoryHandler {
         List<MConfig> toConnectorFromJobConfig = new ArrayList<MConfig>();
         List<MConfig> toConnectorToJobConfig = new ArrayList<MConfig>();
 
-        // ?? dont we need 2 different driver configs for the from/to?
         List<MConfig> driverConfig = new ArrayList<MConfig>();
 
         loadConnectorConfigs(toConnectorLinkConfig, toConnectorFromJobConfig, toConnectorToJobConfig,
@@ -1947,6 +1946,125 @@ public abstract class CommonRepositoryHandler extends JdbcRepositoryHandler {
     return children;
   }
 
+  @Override
+  public MConfig findFromJobConfig(long jobId, String configName, Connection conn) {
+    MFromConfig fromConfigs = findJob(jobId, conn).getFromJobConfig();
+    if (fromConfigs != null) {
+      MConfig config = fromConfigs.getConfig(configName);
+      if (config == null) {
+        throw new SqoopException(CommonRepositoryError.COMMON_0049, "for configName :" + configName);
+      }
+      return config;
+    }
+    throw new SqoopException(CommonRepositoryError.COMMON_0049, "for configName :" + configName);
+  }
+
+  @Override
+  public MConfig findToJobConfig(long jobId, String configName, Connection conn) {
+    MToConfig toConfigs = findJob(jobId, conn).getToJobConfig();
+    if (toConfigs != null) {
+      MConfig config = toConfigs.getConfig(configName);
+      if (config == null) {
+        throw new SqoopException(CommonRepositoryError.COMMON_0050, "for configName :" + configName);
+      }
+      return config;
+    }
+    throw new SqoopException(CommonRepositoryError.COMMON_0050, "for configName :" + configName);
+  }
+
+  @Override
+  public MConfig findDriverJobConfig(long jobId, String configName, Connection conn) {
+    MDriverConfig driverConfigs = findJob(jobId, conn).getDriverConfig();
+    if (driverConfigs != null) {
+      MConfig config = driverConfigs.getConfig(configName);
+      if (config == null) {
+        throw new SqoopException(CommonRepositoryError.COMMON_0051, "for configName :" + configName);
+      }
+      return config;
+    }
+    throw new SqoopException(CommonRepositoryError.COMMON_0051, "for configName :" + configName);
+  }
+
+  @Override
+  public MConfig findLinkConfig(long linkId, String configName, Connection conn) {
+    MConfig driverConfig = findLink(linkId, conn).getConnectorLinkConfig(configName);
+    if (driverConfig == null) {
+      throw new SqoopException(CommonRepositoryError.COMMON_0052, "for configName :" + configName);
+    }
+    return driverConfig;
+  }
+
+  @SuppressWarnings("resource")
+  @Override
+  public void updateJobConfig(long jobId, MConfig config, MConfigUpdateEntityType type,
+      Connection conn) {
+    List<MInput<?>> inputs = config.getInputs();
+    PreparedStatement updateStmt = null;
+
+    try {
+      updateStmt = conn.prepareStatement(crudQueries.getStmtUpdateJobInput());
+      for (MInput<?> input : inputs) {
+        if (input.isEmpty()) {
+          continue;
+        }
+        validateEditableConstraints(type, input);
+        updateStmt.setString(1, input.getUrlSafeValueString());
+        updateStmt.setLong(2, input.getPersistenceId());
+        updateStmt.setLong(3, jobId);
+        updateStmt.executeUpdate();
+      }
+    } catch (SQLException ex) {
+      logException(ex, jobId);
+      throw new SqoopException(CommonRepositoryError.COMMON_0053, ex);
+    } finally {
+      closeStatements(updateStmt);
+    }
+  }
+
+  private void validateEditableConstraints(MConfigUpdateEntityType type, MInput<?> input) {
+    if (input.getEditable().equals(InputEditable.CONNECTOR_ONLY)
+        && type.equals(MConfigUpdateEntityType.USER)) {
+      throw new SqoopException(CommonRepositoryError.COMMON_0055);
+    }
+    if (input.getEditable().equals(InputEditable.USER_ONLY)
+        && type.equals(MConfigUpdateEntityType.CONNECTOR)) {
+      throw new SqoopException(CommonRepositoryError.COMMON_0056);
+    }
+  }
+
+  @Override
+  public void updateLinkConfig(long linkId, MConfig config, MConfigUpdateEntityType type,
+      Connection conn) {
+    List<MInput<?>> inputs = config.getInputs();
+    PreparedStatement updateStmt = null;
+    try {
+      updateStmt = conn.prepareStatement(crudQueries.getStmtUpdateLinkInput());
+      for (MInput<?> input : inputs) {
+        if (input.isEmpty()) {
+          continue;
+        }
+        validateEditableConstraints(type, input);
+        updateStmt.setString(1, input.getUrlSafeValueString());
+        updateStmt.setLong(2, input.getPersistenceId());
+        updateStmt.setLong(3, linkId);
+        updateStmt.executeUpdate();
+      }
+    } catch (SQLException ex) {
+      logException(ex, linkId);
+      throw new SqoopException(CommonRepositoryError.COMMON_0054, ex);
+    } finally {
+      closeStatements(updateStmt);
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public String validationQuery() {
+    return "values(1)"; // Yes, this is valid PostgreSQL SQL
+  }
+
   /**
    * Load configs and corresponding inputs from Derby database.
    *
@@ -2305,10 +2423,8 @@ public abstract class CommonRepositoryHandler extends JdbcRepositoryHandler {
     }
   }
 
-  private void createInputValues(String query,
-                                 long id,
-                                 List<MConfig> configs,
-                                 Connection conn) throws SQLException {
+  private void createInputValues(String query, long id, List<MConfig> configs, Connection conn)
+      throws SQLException {
     PreparedStatement stmt = null;
     int result;
 
@@ -2316,7 +2432,7 @@ public abstract class CommonRepositoryHandler extends JdbcRepositoryHandler {
       stmt = conn.prepareStatement(query);
 
       for (MConfig config : configs) {
-        for (MInput input : config.getInputs()) {
+        for (MInput<?> input : config.getInputs()) {
           // Skip empty values as we're not interested in storing those in db
           if (input.isEmpty()) {
             continue;
@@ -2327,8 +2443,7 @@ public abstract class CommonRepositoryHandler extends JdbcRepositoryHandler {
 
           result = stmt.executeUpdate();
           if (result != 1) {
-            throw new SqoopException(CommonRepositoryError.COMMON_0017,
-                Integer.toString(result));
+            throw new SqoopException(CommonRepositoryError.COMMON_0017, Integer.toString(result));
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/61dfb6db/repository/repository-common/src/main/java/org/apache/sqoop/repository/common/CommonRepositoryInsertUpdateDeleteSelectQuery.java
----------------------------------------------------------------------
diff --git a/repository/repository-common/src/main/java/org/apache/sqoop/repository/common/CommonRepositoryInsertUpdateDeleteSelectQuery.java b/repository/repository-common/src/main/java/org/apache/sqoop/repository/common/CommonRepositoryInsertUpdateDeleteSelectQuery.java
index d61ff0b..3dbad01 100644
--- a/repository/repository-common/src/main/java/org/apache/sqoop/repository/common/CommonRepositoryInsertUpdateDeleteSelectQuery.java
+++ b/repository/repository-common/src/main/java/org/apache/sqoop/repository/common/CommonRepositoryInsertUpdateDeleteSelectQuery.java
@@ -277,6 +277,14 @@ public class CommonRepositoryInsertUpdateDeleteSelectQuery {
           + CommonRepoUtils.escapeColumnName(COLUMN_SQ_LNK_ENABLED) + " = ? "
           + " WHERE " + CommonRepoUtils.escapeColumnName(COLUMN_SQ_LNK_ID) + " = ?";
 
+
+  // UPDATE the LINK Input
+  public static final String STMT_UPDATE_LINK_INPUT =
+      "UPDATE " + CommonRepoUtils.getTableName(SCHEMA_SQOOP, TABLE_SQ_LINK_INPUT_NAME) + " SET "
+          + CommonRepoUtils.escapeColumnName(COLUMN_SQ_LNKI_VALUE) + " = ? "
+          + " WHERE " + CommonRepoUtils.escapeColumnName(COLUMN_SQ_LNKI_INPUT) + " = ?"
+          + " AND " + CommonRepoUtils.escapeColumnName(COLUMN_SQ_LNKI_LINK) + " = ?";
+
   // DML: Delete rows from link input table
   public static final String STMT_DELETE_LINK_INPUT =
       "DELETE FROM " + CommonRepoUtils.getTableName(SCHEMA_SQOOP, TABLE_SQ_LINK_INPUT_NAME)
@@ -384,6 +392,13 @@ public class CommonRepositoryInsertUpdateDeleteSelectQuery {
           + CommonRepoUtils.escapeColumnName(COLUMN_SQB_ENABLED) + " = ? "
           + " WHERE " + CommonRepoUtils.escapeColumnName(COLUMN_SQB_ID) + " = ?";
 
+  // UPDATE the JOB Input
+  public static final String STMT_UPDATE_JOB_INPUT =
+      "UPDATE " + CommonRepoUtils.getTableName(SCHEMA_SQOOP, TABLE_SQ_JOB_INPUT_NAME) + " SET "
+          + CommonRepoUtils.escapeColumnName(COLUMN_SQBI_VALUE) + " = ? "
+          + " WHERE " + CommonRepoUtils.escapeColumnName(COLUMN_SQBI_INPUT) + " = ?"
+          + " AND " + CommonRepoUtils.escapeColumnName(COLUMN_SQBI_JOB) + " = ?";
+
   // DML: Delete rows from job input table
   public static final String STMT_DELETE_JOB_INPUT =
       "DELETE FROM " + CommonRepoUtils.getTableName(SCHEMA_SQOOP, TABLE_SQ_JOB_INPUT_NAME)
@@ -695,6 +710,10 @@ public class CommonRepositoryInsertUpdateDeleteSelectQuery {
     return STMT_ENABLE_LINK;
   }
 
+  public String getStmtUpdateLinkInput() {
+    return STMT_UPDATE_LINK_INPUT;
+  }
+
   public String getStmtDeleteLinkInput() {
     return STMT_DELETE_LINK_INPUT;
   }
@@ -739,6 +758,10 @@ public class CommonRepositoryInsertUpdateDeleteSelectQuery {
     return STMT_ENABLE_JOB;
   }
 
+  public String getStmtUpdateJobInput() {
+    return STMT_UPDATE_JOB_INPUT;
+  }
+
   public String getStmtDeleteJobInput() {
     return STMT_DELETE_JOB_INPUT;
   }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/61dfb6db/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/DerbyTestCase.java
----------------------------------------------------------------------
diff --git a/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/DerbyTestCase.java b/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/DerbyTestCase.java
index bea5cd7..9ed7627 100644
--- a/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/DerbyTestCase.java
+++ b/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/DerbyTestCase.java
@@ -526,7 +526,7 @@ abstract public class DerbyTestCase {
       // First config
       runInsertQuery("INSERT INTO SQOOP.SQ_INPUT"
           + "(SQI_NAME, SQI_CONFIG, SQI_INDEX, SQI_TYPE, SQI_STRMASK, SQI_STRLENGTH, SQI_EDITABLE)"
-          + " VALUES('I1', " + (i * 2 + 1) + ", 0, 'STRING', false, 30, 'CONNECTOR_ONLY')");
+          + " VALUES('I1', " + (i * 2 + 1) + ", 0, 'STRING', false, 30, 'USER_ONLY')");
       runInsertQuery("INSERT INTO SQOOP.SQ_INPUT"
           + "(SQI_NAME, SQI_CONFIG, SQI_INDEX, SQI_TYPE, SQI_STRMASK, SQI_STRLENGTH, SQI_EDITABLE)"
           + " VALUES('I2', " + (i * 2 + 1) + ", 1, 'MAP', false, 30, 'CONNECTOR_ONLY')");
@@ -534,10 +534,10 @@ abstract public class DerbyTestCase {
       // Second config
       runInsertQuery("INSERT INTO SQOOP.SQ_INPUT"
           + "(SQI_NAME, SQI_CONFIG, SQI_INDEX, SQI_TYPE, SQI_STRMASK, SQI_STRLENGTH, SQI_EDITABLE)"
-          + " VALUES('I3', " + (i * 2 + 2) + ", 0, 'STRING', false, 30, 'CONNECTOR_ONLY')");
+          + " VALUES('I3', " + (i * 2 + 2) + ", 0, 'STRING', false, 30, 'USER_ONLY')");
       runInsertQuery("INSERT INTO SQOOP.SQ_INPUT"
           + "(SQI_NAME, SQI_CONFIG, SQI_INDEX, SQI_TYPE, SQI_STRMASK, SQI_STRLENGTH, SQI_EDITABLE)"
-          + " VALUES('I4', " + (i * 2 + 2) + ", 1, 'MAP', false, 30, 'CONNECTOR_ONLY')");
+          + " VALUES('I4', " + (i * 2 + 2) + ", 1, 'MAP', false, 30, 'USER_ONLY')");
     }
   }
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/61dfb6db/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestJobHandling.java
----------------------------------------------------------------------
diff --git a/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestJobHandling.java b/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestJobHandling.java
index 6a248e9..b889b85 100644
--- a/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestJobHandling.java
+++ b/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestJobHandling.java
@@ -32,14 +32,18 @@ import java.util.Map;
 import org.apache.sqoop.common.Direction;
 import org.apache.sqoop.common.SqoopException;
 import org.apache.sqoop.model.MConfig;
+import org.apache.sqoop.model.MConfigUpdateEntityType;
 import org.apache.sqoop.model.MDriver;
+import org.apache.sqoop.model.MFromConfig;
 import org.apache.sqoop.model.MJob;
 import org.apache.sqoop.model.MMapInput;
 import org.apache.sqoop.model.MStringInput;
+import org.apache.sqoop.model.MToConfig;
 import org.apache.sqoop.error.code.CommonRepositoryError;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
+
 /**
  * Test job methods on Derby repository.
  */
@@ -244,11 +248,19 @@ public class TestJobHandling extends DerbyTestCase {
 
     configs = job.getJobConfig(Direction.FROM).getConfigs();
     ((MStringInput)configs.get(0).getInputs().get(0)).setValue("Updated");
-    ((MMapInput)configs.get(0).getInputs().get(1)).setValue(null);
+    Map<String, String> newFromMap = new HashMap<String, String>();
+    newFromMap.put("1F", "foo");
+    newFromMap.put("2F", "bar");
+
+    ((MMapInput)configs.get(0).getInputs().get(1)).setValue(newFromMap);
 
     configs = job.getJobConfig(Direction.TO).getConfigs();
     ((MStringInput)configs.get(0).getInputs().get(0)).setValue("Updated");
-    ((MMapInput)configs.get(0).getInputs().get(1)).setValue(null);
+    Map<String, String> newToMap = new HashMap<String, String>();
+    newToMap.put("1T", "foo");
+    newToMap.put("2T", "bar");
+
+    ((MMapInput)configs.get(0).getInputs().get(1)).setValue(newToMap);
 
     configs = job.getDriverConfig().getConfigs();
     ((MStringInput)configs.get(0).getInputs().get(0)).setValue("Updated");
@@ -262,7 +274,7 @@ public class TestJobHandling extends DerbyTestCase {
 
     assertEquals(1, job.getPersistenceId());
     assertCountForTable("SQOOP.SQ_JOB", 4);
-    assertCountForTable("SQOOP.SQ_JOB_INPUT", 26);
+    assertCountForTable("SQOOP.SQ_JOB_INPUT", 28);
 
     MJob retrieved = handler.findJob(1, derbyConnection);
     assertEquals("name", retrieved.getName());
@@ -270,17 +282,17 @@ public class TestJobHandling extends DerbyTestCase {
     configs = job.getJobConfig(Direction.FROM).getConfigs();
     assertEquals(2, configs.size());
     assertEquals("Updated", configs.get(0).getInputs().get(0).getValue());
-    assertNull(configs.get(0).getInputs().get(1).getValue());
+    assertEquals(newFromMap, configs.get(0).getInputs().get(1).getValue());
     configs = job.getJobConfig(Direction.TO).getConfigs();
     assertEquals(2, configs.size());
     assertEquals("Updated", configs.get(0).getInputs().get(0).getValue());
-    assertNull(configs.get(0).getInputs().get(1).getValue());
+    assertEquals(newToMap, configs.get(0).getInputs().get(1).getValue());
 
     configs = retrieved.getDriverConfig().getConfigs();
     assertEquals(2, configs.size());
     assertEquals("Updated", configs.get(0).getInputs().get(0).getValue());
     assertNotNull(configs.get(0).getInputs().get(1).getValue());
-    assertEquals(((Map)configs.get(0).getInputs().get(1).getValue()).size(), 0);
+    assertEquals(((Map) configs.get(0).getInputs().get(1).getValue()).size(), 0);
   }
 
   @Test
@@ -323,11 +335,136 @@ public class TestJobHandling extends DerbyTestCase {
     assertCountForTable("SQOOP.SQ_JOB_INPUT", 0);
   }
 
+  @Test
+  public void testUpdateJobConfig() throws Exception {
+    loadJobsForLatestVersion();
+
+    assertCountForTable("SQOOP.SQ_JOB", 4);
+    assertCountForTable("SQOOP.SQ_JOB_INPUT", 24);
+    MJob job = handler.findJob(1, derbyConnection);
+
+    List<MConfig> fromConfigs = job.getJobConfig(Direction.FROM).getConfigs();
+    MConfig fromConfig = fromConfigs.get(0).clone(false);
+    MConfig newFromConfig = new MConfig(fromConfig.getName(), fromConfig.getInputs());
+
+    ((MStringInput) newFromConfig.getInputs().get(0)).setValue("FromJobConfigUpdated");
+
+    handler.updateJobConfig(job.getPersistenceId(), newFromConfig, MConfigUpdateEntityType.USER,
+        derbyConnection);
+
+    MJob updatedJob = handler.findJob(1, derbyConnection);
+    MFromConfig newFromConfigs = updatedJob.getFromJobConfig();
+    assertEquals(2, newFromConfigs.getConfigs().size());
+    MConfig updatedFromConfig = newFromConfigs.getConfigs().get(0);
+    assertEquals("FromJobConfigUpdated", updatedFromConfig.getInputs().get(0).getValue());
+
+    List<MConfig> toConfigs = job.getJobConfig(Direction.TO).getConfigs();
+    MConfig toConfig = toConfigs.get(0).clone(false);
+    MConfig newToConfig = new MConfig(toConfig.getName(), toConfig.getInputs());
+
+    ((MStringInput) newToConfig.getInputs().get(0)).setValue("ToJobConfigUpdated");
+
+    handler.updateJobConfig(job.getPersistenceId(), newToConfig, MConfigUpdateEntityType.USER,
+        derbyConnection);
+
+    updatedJob = handler.findJob(1, derbyConnection);
+    MToConfig newToConfigs = updatedJob.getToJobConfig();
+    assertEquals(2, newToConfigs.getConfigs().size());
+    MConfig updatedToConfig = newToConfigs.getConfigs().get(0);
+    assertEquals("ToJobConfigUpdated", updatedToConfig.getInputs().get(0).getValue());
+  }
+
+  @Test(expectedExceptions = SqoopException.class)
+  public void testIncorrectEntityCausingConfigUpdate() throws Exception {
+    loadJobsForLatestVersion();
+
+    assertCountForTable("SQOOP.SQ_JOB", 4);
+    assertCountForTable("SQOOP.SQ_JOB_INPUT", 24);
+    MJob job = handler.findJob(1, derbyConnection);
+
+    List<MConfig> fromConfigs = job.getJobConfig(Direction.FROM).getConfigs();
+    MConfig fromConfig = fromConfigs.get(0).clone(false);
+    MConfig newFromConfig = new MConfig(fromConfig.getName(), fromConfig.getInputs());
+    HashMap<String, String> newMap = new HashMap<String, String>();
+    newMap.put("1", "foo");
+    newMap.put("2", "bar");
+
+    ((MMapInput) newFromConfig.getInputs().get(1)).setValue(newMap);
+
+    handler.updateJobConfig(job.getPersistenceId(), newFromConfig, MConfigUpdateEntityType.USER,
+        derbyConnection);
+  }
+
+  @Test
+  public void testFindAndUpdateJobConfig() throws Exception {
+    loadJobsForLatestVersion();
+    MJob job = handler.findJob(1, derbyConnection);
+
+    assertCountForTable("SQOOP.SQ_JOB", 4);
+    assertCountForTable("SQOOP.SQ_JOB_INPUT", 24);
+    MConfig fromConfig = handler.findFromJobConfig(1, "C1JOB1", derbyConnection);
+    assertEquals("Value5", fromConfig.getInputs().get(0).getValue());
+    assertNull(fromConfig.getInputs().get(1).getValue());
+
+    MConfig toConfig = handler.findToJobConfig(1, "C2JOB2", derbyConnection);
+    assertEquals("Value11", toConfig.getInputs().get(0).getValue());
+    assertNull(toConfig.getInputs().get(1).getValue());
+    HashMap<String, String> newMap = new HashMap<String, String>();
+    newMap.put("1UPDATED", "foo");
+    newMap.put("2UPDATED", "bar");
+    ((MStringInput) toConfig.getInputs().get(0)).setValue("test");
+    ((MMapInput) toConfig.getInputs().get(1)).setValue(newMap);
+
+    handler.updateJobConfig(job.getPersistenceId(), toConfig, MConfigUpdateEntityType.USER,
+        derbyConnection);
+    assertEquals("test", toConfig.getInputs().get(0).getValue());
+    assertEquals(newMap, toConfig.getInputs().get(1).getValue());
+
+    MConfig driverConfig = handler.findDriverJobConfig(1, "d1", derbyConnection);
+    assertEquals("Value13", driverConfig.getInputs().get(0).getValue());
+    assertNull(driverConfig.getInputs().get(1).getValue());
+  }
+
+  @Test(expectedExceptions = SqoopException.class)
+  public void testNonExistingFromConfigFetch() throws Exception {
+    loadJobsForLatestVersion();
+
+    assertCountForTable("SQOOP.SQ_JOB", 4);
+    assertCountForTable("SQOOP.SQ_JOB_INPUT", 24);
+    handler.findFromJobConfig(1, "Non-ExistingC1JOB1", derbyConnection);
+  }
+
+  @Test(expectedExceptions = SqoopException.class)
+  public void testNonExistingToConfigFetch() throws Exception {
+    loadJobsForLatestVersion();
+
+    assertCountForTable("SQOOP.SQ_JOB", 4);
+    assertCountForTable("SQOOP.SQ_JOB_INPUT", 24);
+    handler.findToJobConfig(1, "Non-ExistingC2JOB1", derbyConnection);
+  }
+
+  @Test(expectedExceptions = SqoopException.class)
+  public void testNonExistingDriverConfigFetch() throws Exception {
+    loadJobsForLatestVersion();
+
+    assertCountForTable("SQOOP.SQ_JOB", 4);
+    assertCountForTable("SQOOP.SQ_JOB_INPUT", 24);
+    handler.findDriverJobConfig(1, "Non-Existingd1", derbyConnection);
+  }
+
+  @Test(expectedExceptions = SqoopException.class)
+  public void testNonExistingJobConfig() throws Exception {
+    loadJobsForLatestVersion();
+
+    assertCountForTable("SQOOP.SQ_JOB", 4);
+    assertCountForTable("SQOOP.SQ_JOB_INPUT", 24);
+    // 11 does not exist
+    handler.findDriverJobConfig(11, "Non-d1", derbyConnection);
+  }
+
   public MJob getJob() {
-    return new MJob(1, 1, 1, 1,
-      handler.findConnector("A", derbyConnection).getFromConfig(),
-      handler.findConnector("A", derbyConnection).getToConfig(),
-      handler.findDriver(MDriver.DRIVER_NAME, derbyConnection).getDriverConfig()
-    );
+    return new MJob(1, 1, 1, 1, handler.findConnector("A", derbyConnection).getFromConfig(),
+        handler.findConnector("A", derbyConnection).getToConfig(), handler.findDriver(
+            MDriver.DRIVER_NAME, derbyConnection).getDriverConfig());
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/sqoop/blob/61dfb6db/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestLinkHandling.java
----------------------------------------------------------------------
diff --git a/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestLinkHandling.java b/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestLinkHandling.java
index 523464b..1ee7996 100644
--- a/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestLinkHandling.java
+++ b/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestLinkHandling.java
@@ -28,7 +28,9 @@ import java.util.List;
 
 import org.apache.sqoop.common.SqoopException;
 import org.apache.sqoop.model.MConfig;
+import org.apache.sqoop.model.MConfigUpdateEntityType;
 import org.apache.sqoop.model.MLink;
+import org.apache.sqoop.model.MLinkConfig;
 import org.apache.sqoop.model.MMapInput;
 import org.apache.sqoop.model.MStringInput;
 import org.apache.sqoop.error.code.CommonRepositoryError;
@@ -287,6 +289,48 @@ public class TestLinkHandling extends DerbyTestCase {
     assertCountForTable("SQOOP.SQ_LINK_INPUT", 0);
   }
 
+  @Test
+  public void testUpdateLinkConfig() throws Exception {
+    loadLinksForLatestVersion();
+
+    assertCountForTable("SQOOP.SQ_LINK", 2);
+    assertCountForTable("SQOOP.SQ_LINK_INPUT", 8);
+    MLink link = handler.findLink(1, getDerbyDatabaseConnection());
+
+    List<MConfig> configs = link.getConnectorLinkConfig().getConfigs();
+    MConfig config = configs.get(0).clone(false);
+    MConfig newConfig = new MConfig(config.getName(), config.getInputs());
+
+    ((MStringInput) newConfig.getInputs().get(0)).setValue("LinkConfigUpdated");
+
+    handler.updateLinkConfig(link.getPersistenceId(), newConfig, MConfigUpdateEntityType.USER,
+        getDerbyDatabaseConnection());
+
+    MLink updatedLink = handler.findLink(1, getDerbyDatabaseConnection());
+    MLinkConfig newConfigs = updatedLink.getConnectorLinkConfig();
+    assertEquals(2, newConfigs.getConfigs().size());
+    MConfig updatedLinkConfig = newConfigs.getConfigs().get(0);
+    assertEquals("LinkConfigUpdated", updatedLinkConfig.getInputs().get(0).getValue());
+  }
+
+  @Test(expectedExceptions = SqoopException.class)
+  public void testNonExistingLinkConfigFetch() throws Exception {
+    loadLinksForLatestVersion();
+    assertCountForTable("SQOOP.SQ_LINK", 2);
+    assertCountForTable("SQOOP.SQ_LINK_INPUT", 8);
+    handler.findLinkConfig(1, "Non-ExistingC1LINK1", getDerbyDatabaseConnection());
+  }
+
+  @Test
+  public void testLinkConfigFetch() throws Exception {
+    loadLinksForLatestVersion();
+    assertCountForTable("SQOOP.SQ_LINK", 2);
+    assertCountForTable("SQOOP.SQ_LINK_INPUT", 8);
+    MConfig config = handler.findLinkConfig(1, "C1LINK0", getDerbyDatabaseConnection());
+    assertEquals("Value1", config.getInputs().get(0).getValue());
+    assertNull(config.getInputs().get(1).getValue());
+  }
+
   public MLink getLink() {
     return new MLink(1, handler.findConnector("A", getDerbyDatabaseConnection()).getLinkConfig());
   }